Merge pull request #28 from status-im/cleanup

Code cleanup
This commit is contained in:
Csaba Kiraly 2023-03-27 11:41:19 +02:00 committed by GitHub
commit bcf3098e9f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 57 additions and 68 deletions

View File

@ -10,21 +10,12 @@ class Observer:
self.config = config
self.format = {"entity": "Observer"}
self.logger = logger
self.block = []
self.rows = []
self.columns = []
self.goldenData = []
self.broadcasted = []
def reset(self):
"""It resets all the gathered data to zeros."""
self.block = [0] * self.config.blockSize * self.config.blockSize
self.goldenData = [0] * self.config.blockSize * self.config.blockSize
self.rows = [0] * self.config.blockSize
self.columns = [0] * self.config.blockSize
self.broadcasted = Block(self.config.blockSize)
def checkRowsColumns(self, validators):
"""It checks how many validators have been assigned to each row and column."""
for val in validators:
@ -39,11 +30,6 @@ class Observer:
if self.rows[i] == 0 or self.columns[i] == 0:
self.logger.warning("There is a row/column that has not been assigned", extra=self.format)
def setGoldenData(self, block):
"""Stores the original real data to compare it with future situations."""
for i in range(self.config.blockSize*self.config.blockSize):
self.goldenData[i] = block.data[i]
def checkBroadcasted(self):
"""It checks how many broadcasted samples are still missing in the network."""
zeros = 0

View File

@ -1,7 +1,7 @@
bitarray==2.6.0
DAS==0.29.0
dicttoxml==1.7.16
matplotlib==3.6.2
networkx==3.0
numpy==1.23.5
seaborn==0.12.2
joblib==1.2.0

View File

@ -2,6 +2,7 @@
import networkx as nx
import logging, random
from functools import partial, partialmethod
from datetime import datetime
from statistics import mean
from DAS.tools import *
@ -24,10 +25,21 @@ class Simulator:
self.proposerID = 0
self.glob = []
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishTo = self.shape.netDegree
def initValidators(self):
"""It initializes all the validators in the network."""
self.glob = Observer(self.logger, self.shape)
self.glob.reset()
self.validators = []
if self.config.evenLineDistribution:
@ -55,7 +67,6 @@ class Simulator:
val = Validator(i, int(not i!=0), self.logger, self.shape)
if i == self.proposerID:
val.initBlock()
self.glob.setGoldenData(val.block)
else:
val.logIDs()
self.validators.append(val)
@ -137,6 +148,11 @@ class Simulator:
def initLogger(self):
"""It initializes the logger."""
logging.TRACE = 5
logging.addLevelName(logging.TRACE, 'TRACE')
logging.Logger.trace = partialmethod(logging.Logger.log, logging.TRACE)
logging.trace = partial(logging.log, logging.TRACE)
logger = logging.getLogger("DAS")
if len(logger.handlers) == 0:
logger.setLevel(self.logLevel)
@ -146,30 +162,6 @@ class Simulator:
logger.addHandler(ch)
self.logger = logger
def resetShape(self, shape):
"""It resets the parameters of the simulation."""
self.shape = shape
self.result = Result(self.shape)
for val in self.validators:
val.shape.failureRate = shape.failureRate
val.shape.chi = shape.chi
val.shape.vpn1 = shape.vpn1
val.shape.vpn2 = shape.vpn2
# In GossipSub the initiator might push messages without participating in the mesh.
# proposerPublishOnly regulates this behavior. If set to true, the proposer is not
# part of the p2p distribution graph, only pushes segments to it. If false, the proposer
# might get back segments from other peers since links are symmetric.
self.proposerPublishOnly = True
# If proposerPublishOnly == True, this regulates how many copies of each segment are
# pushed out by the proposer.
# 1: the data is sent out exactly once on rows and once on columns (2 copies in total)
# self.shape.netDegree: default behavior similar (but not same) to previous code
self.proposerPublishTo = self.shape.netDegree
def run(self):
"""It runs the main simulation until the block is available or it gets stucked."""
self.glob.checkRowsColumns(self.validators)

View File

@ -61,14 +61,16 @@ class Validator:
self.logger.error("Chi has to be smaller than %d" % self.shape.blockSize, extra=self.format)
else:
if amIproposer:
self.nodeClass = 0
self.rowIDs = range(shape.blockSize)
self.columnIDs = range(shape.blockSize)
else:
#if shape.deterministic:
# random.seed(self.ID)
vpn = self.shape.vpn1 if (self.ID <= shape.numberNodes * shape.class1ratio) else self.shape.vpn2
self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn)
self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, vpn)
self.nodeClass = 1 if (self.ID <= shape.numberNodes * shape.class1ratio) else 2
self.vpn = self.shape.vpn1 if (self.nodeClass == 1) else self.shape.vpn2
self.rowIDs = rows if rows else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn)
self.columnIDs = columns if columns else unionOfSamples(range(self.shape.blockSize), self.shape.chi, self.vpn)
self.rowNeighbors = collections.defaultdict(dict)
self.columnNeighbors = collections.defaultdict(dict)
@ -83,7 +85,7 @@ class Validator:
# TODO: this should be a parameter
if self.amIproposer:
self.bwUplink = shape.bwUplinkProd
elif self.ID <= shape.numberNodes * shape.class1ratio:
elif self.nodeClass == 1:
self.bwUplink = shape.bwUplink1
else:
self.bwUplink = shape.bwUplink2
@ -155,12 +157,12 @@ class Validator:
if src in self.columnNeighbors[cID]:
self.columnNeighbors[cID][src].receiving[rID] = 1
if not self.receivedBlock.getSegment(rID, cID):
self.logger.debug("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.logger.trace("Recv new: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.receivedBlock.setSegment(rID, cID)
if self.perNodeQueue or self.perNeighborQueue:
self.receivedQueue.append((rID, cID))
else:
self.logger.debug("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
self.logger.trace("Recv DUP: %d->%d: %d,%d", src, self.ID, rID, cID, extra=self.format)
# self.statsRxDuplicateInSlot += 1
self.statsRxInSlot += 1
@ -183,7 +185,7 @@ class Validator:
if self.amIproposer == 1:
self.logger.error("I am a block proposer", extra=self.format)
else:
self.logger.debug("Receiving the data...", extra=self.format)
self.logger.trace("Receiving the data...", extra=self.format)
#self.logger.debug("%s -> %s", self.block.data, self.receivedBlock.data, extra=self.format)
self.block.merge(self.receivedBlock)
@ -219,7 +221,7 @@ class Validator:
def sendSegmentToNeigh(self, rID, cID, neigh):
"""Send segment to a neighbor (without checks)."""
self.logger.debug("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
self.logger.trace("sending %d/%d to %d", rID, cID, neigh.node.ID, extra=self.format)
i = rID if neigh.dim else cID
neigh.sent[i] = 1
neigh.node.receiveSegment(rID, cID, self.ID)
@ -454,7 +456,7 @@ class Validator:
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.debug("Rep: %d,%d", id, i, extra=self.format)
self.logger.trace("Rep: %d,%d", id, i, extra=self.format)
self.addToSendQueue(id, i)
# self.statsRepairInSlot += rep.count(1)
@ -472,7 +474,7 @@ class Validator:
# be queued after successful repair.
for i in range(len(rep)):
if rep[i]:
self.logger.debug("Rep: %d,%d", i, id, extra=self.format)
self.logger.trace("Rep: %d,%d", i, id, extra=self.format)
self.addToSendQueue(i, id)
# self.statsRepairInSlot += rep.count(1)

View File

@ -12,17 +12,32 @@ from DAS import *
# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls
# and https://github.com/joblib/joblib/issues/1017
def runOnce(sim, config, shape):
def initLogger(config):
"""It initializes the logger."""
logger = logging.getLogger("Study")
logger.setLevel(config.logLevel)
ch = logging.StreamHandler()
ch.setLevel(config.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
return logger
def runOnce(config, shape, execID):
if config.deterministic:
shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed)
sim = Simulator(shape, config)
sim.initLogger()
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dumpXML:
result.dump(execID)
return result
def study():
@ -40,29 +55,23 @@ def study():
print("You need to pass a configuration file in parameter")
exit(1)
shape = Shape(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0)
sim = Simulator(shape, config)
sim.initLogger()
logger = initLogger(config)
format = {"entity": "Study"}
results = []
now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
sim.logger.info("Starting simulations:", extra=sim.format)
logger.info("Starting simulations:", extra=format)
start = time.time()
results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape())
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
end = time.time()
sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format)
if config.dumpXML:
for res in results:
res.dump(execID)
sim.logger.info("Results dumped into results/%s/" % (execID), extra=sim.format)
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization:
vis = Visualizer(execID)
vis.plotHeatmaps()
study()
if __name__ == "__main__":
study()