das-research/study.py

225 lines
7.7 KiB
Python

#! /bin/python3
import time, sys, random, copy
import importlib
import subprocess
from joblib import Parallel, delayed
from DAS import *
import os
import pickle
import uuid
# Parallel execution:
# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see
# https://stackoverflow.com/questions/9786102/how-do-i-parallelize-a-simple-python-loop
# For fixing logging issues in parallel execution, see
# https://stackoverflow.com/questions/58026381/logging-nested-functions-using-joblib-parallel-and-delayed-calls
# and https://github.com/joblib/joblib/issues/1017
def initLogger(config):
"""It initializes the logger."""
logger = logging.getLogger("Study")
logger.setLevel(config.logLevel)
ch = logging.StreamHandler()
ch.setLevel(config.logLevel)
ch.setFormatter(CustomFormatter())
logger.addHandler(ch)
return logger
def runOnce(config, shape, execID):
if config.deterministic:
shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed)
backup_folder = f"results/{execID}/backup"
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
backup_file = os.path.join(backup_folder, f"simulation_data_{shape}.pkl")
with open(backup_file, 'ab') as f:
pickle.dump(shape.__dict__, f)
sim = Simulator(shape, config, execID)
sim.initLogger()
sim.initValidators()
sim.initNetwork()
result = sim.run()
sim.logger.info("Shape: %s ... Block Available: %d in %d steps" % (str(sim.shape.__dict__), result.blockAvailable, len(result.missingVector)), extra=sim.format)
if config.dumpXML:
result.dump()
if config.visualization:
visual = Visualizor(execID, config, [result])
visual.plotAll()
with open(backup_file, 'ab') as f:
pickle.dump("completed", f)
return result
def check_simulation_completion(state_file):
backup_dir = os.path.join(os.path.dirname(state_file), "backup")
if not os.path.exists(backup_dir):
return False
all_completed = True
incomplete_files = []
completed_files = []
completed_shapes = []
for filename in sorted(os.listdir(backup_dir), reverse=True):
if not filename.endswith(".pkl"):
continue
full_path = os.path.join(backup_dir, filename)
try:
with open(full_path, 'rb') as f:
items = []
while True:
try:
item = pickle.load(f)
items.append(item)
except EOFError:
break
last_item = items[-1]
if last_item != "completed":
all_completed = False
incomplete_files.append(full_path)
else:
completed_files.append(full_path)
completed_shapes.append(items[0])
except (OSError, pickle.UnpicklingError) as e:
print(f"Error loading state from {full_path}: {e}")
all_completed = False
break
return all_completed, incomplete_files, completed_files, completed_shapes
def start_simulation(execID, completed_files, completed_shapes, incomplete_files):
config = importlib.import_module("smallConf")
logger = initLogger(config)
format = {"entity": "Study"}
results = []
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
if config.saveGit:
with open(dir+"/git.diff", 'w') as f:
subprocess.run(["git", "diff"], stdout=f)
with open(dir+"/git.describe", 'w') as f:
subprocess.run(["git", "describe", "--always"], stdout=f)
logger.info("Starting simulations:", extra=format)
start = time.time()
for shape in config.nextShape():
comparison_dict = shape.__dict__.copy()
ignore_keys = ['randomSeed']
for key in ignore_keys:
del comparison_dict[key]
if any(all(comparison_dict[key] == completed_shape[key] for key in comparison_dict.keys() if key not in ignore_keys) for completed_shape in completed_shapes):
logger.info("Skipping simulation for shape (already completed): %s" % (str(shape.__dict__)), extra=format)
else:
results.append(delayed(runOnce)(config, shape, execID))
results = Parallel(config.numJobs)(results)
end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization:
vis = Visualizer(execID, config)
vis.plotHeatmaps()
visual = Visualizor(execID, config, results)
visual.plotHeatmaps("nn", "fr")
def study():
restart_path = None
for arg in sys.argv[1:]:
if arg.startswith("--restart="):
restart_path = arg[len("--restart="):]
if restart_path:
execID = restart_path.split("/")[1]
state_file = f"results/{execID}/backup"
all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file)
current_shapes = []
config = importlib.import_module("smallConf")
completed_shapes_without_seed = completed_shapes
for shape in config.nextShape():
shape_dict = copy.deepcopy(shape.__dict__)
del shape_dict['randomSeed']
current_shapes.append(shape_dict)
for shape in completed_shapes_without_seed:
if 'randomSeed' in shape:
del shape['randomSeed']
completed_set = {frozenset(shape.items()) for shape in completed_shapes_without_seed}
current_set = {frozenset(shape.items()) for shape in current_shapes}
if all_completed and completed_set == current_set:
print("Simulation is already completed.")
sys.exit(0)
else:
print("Restarting simulations.")
start_simulation(execID, completed_files, completed_shapes, incomplete_files)
sys.exit(0)
if len(sys.argv) < 2:
print("You need to pass a configuration file in parameter")
exit(1)
try:
config = importlib.import_module(sys.argv[1])
except ModuleNotFoundError as e:
try:
config = importlib.import_module(str(sys.argv[1]).replace(".py", ""))
except ModuleNotFoundError as e:
print(e)
print("You need to pass a configuration file in parameter")
exit(1)
logger = initLogger(config)
format = {"entity": "Study"}
results = []
now = datetime.now()
execID = now.strftime("%Y-%m-%d_%H-%M-%S_")+str(random.randint(100,999))
# save config and code state for reproducibility
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
if config.saveGit:
with open(dir+"/git.diff", 'w') as f:
subprocess.run(["git", "diff"], stdout=f)
with open(dir+"/git.describe", 'w') as f:
subprocess.run(["git", "describe", "--always"], stdout=f)
subprocess.run(["cp", sys.argv[1], dir+"/"])
logger.info("Starting simulations:", extra=format)
start = time.time()
results = Parallel(config.numJobs)(delayed(runOnce)(config, shape ,execID) for shape in config.nextShape())
end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization:
vis = Visualizer(execID, config)
vis.plotHeatmaps()
visual = Visualizor(execID, config, results)
visual.plotHeatmaps("nn", "fr")
visual.plotAllHeatMaps()
if __name__ == "__main__":
study()