Merge pull request #21 from status-im/multiproc

Parallel execution of simulations with parameters to select number of threads
This commit is contained in:
Leo 2023-03-14 16:49:04 +01:00 committed by GitHub
commit 8c9106d03d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 31 additions and 17 deletions

View File

@ -104,11 +104,12 @@ class Simulator:
def initLogger(self):
"""It initializes the logger."""
logger = logging.getLogger("DAS")
logger.setLevel(self.logLevel)
ch = logging.StreamHandler()
ch.setLevel(self.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
if len(logger.handlers) == 0:
logger.setLevel(self.logLevel)
ch = logging.StreamHandler()
ch.setLevel(self.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
self.logger = logger

View File

@ -20,6 +20,10 @@ dumpXML = 1
visualization = 1
logLevel = logging.INFO
# number of parallel workers. -1: all cores; 1: sequential
# for more details, see joblib.Parallel
numJobs = 3
# Number of simulation runs with the same parameters for statistical relevance
runs = range(10)

View File

@ -2,8 +2,27 @@
import time, sys, random, copy
import importlib
from joblib import Parallel, delayed
from DAS import *
# Parallel execution:
# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see
# https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop
# For fixing logging issues in parallel execution, see
# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls
# and https://github.com/joblib/joblib/issues/1017
def runOnce(sim, config, shape):
if not config.deterministic:
random.seed(datetime.now())
sim.initLogger()
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
return result
def study():
if len(sys.argv) < 2:
@ -24,7 +43,6 @@ def study():
sim = Simulator(shape, config)
sim.initLogger()
results = []
simCnt = 0
now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
@ -32,20 +50,11 @@ def study():
sim.logger.info("Starting simulations:", extra=sim.format)
start = time.time()
for shape in config.nextShape():
if not config.deterministic:
random.seed(datetime.now())
results = Parallel(config.numJobs)(delayed(runOnce)(sim, config, shape) for shape in config.nextShape())
sim.resetShape(shape)
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
results.append(copy.deepcopy(result))
simCnt += 1
end = time.time()
sim.logger.info("A total of %d simulations ran in %d seconds" % (simCnt, end-start), extra=sim.format)
sim.logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=sim.format)
if config.dumpXML:
for res in results: