Merge pull request #66 from codex-storage/incremental-simulation

Implement stop and resume functionality
This commit is contained in:
Leo 2024-05-14 11:58:45 +02:00 committed by GitHub
commit ca9a0fa1e4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 127 additions and 0 deletions

View File

@ -273,6 +273,7 @@ class Simulator:
trafficStatsVector = []
malicious_nodes_not_added_count = 0
steps = 0
while(True):
missingVector.append(missingSamples)
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
@ -352,6 +353,7 @@ class Simulator:
break
steps += 1
for i in range(0,self.shape.numberNodes):
if not self.validators[i].amIaddedToQueue :
malicious_nodes_not_added_count += 1

125
study.py
View File

@ -5,6 +5,9 @@ import importlib
import subprocess
from joblib import Parallel, delayed
from DAS import *
import os
import pickle
import uuid
# Parallel execution:
# The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see
@ -29,6 +32,13 @@ def runOnce(config, shape, execID):
shape.setSeed(config.randomSeed+"-"+str(shape))
random.seed(shape.randomSeed)
backup_folder = f"results/{execID}/backup"
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
backup_file = os.path.join(backup_folder, f"simulation_data_{shape}.pkl")
with open(backup_file, 'ab') as f:
pickle.dump(shape.__dict__, f)
sim = Simulator(shape, config, execID)
sim.initLogger()
sim.initValidators()
@ -43,9 +53,124 @@ def runOnce(config, shape, execID):
visual = Visualizor(execID, config, [result])
visual.plotAll()
with open(backup_file, 'ab') as f:
pickle.dump("completed", f)
return result
def check_simulation_completion(state_file):
backup_dir = os.path.join(os.path.dirname(state_file), "backup")
if not os.path.exists(backup_dir):
return False
all_completed = True
incomplete_files = []
completed_files = []
completed_shapes = []
for filename in sorted(os.listdir(backup_dir), reverse=True):
if not filename.endswith(".pkl"):
continue
full_path = os.path.join(backup_dir, filename)
try:
with open(full_path, 'rb') as f:
items = []
while True:
try:
item = pickle.load(f)
items.append(item)
except EOFError:
break
last_item = items[-1]
if last_item != "completed":
all_completed = False
incomplete_files.append(full_path)
else:
completed_files.append(full_path)
completed_shapes.append(items[0])
except (OSError, pickle.UnpicklingError) as e:
print(f"Error loading state from {full_path}: {e}")
all_completed = False
break
return all_completed, incomplete_files, completed_files, completed_shapes
def start_simulation(execID, completed_files, completed_shapes, incomplete_files):
config = importlib.import_module("smallConf")
logger = initLogger(config)
format = {"entity": "Study"}
results = []
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
if config.saveGit:
with open(dir+"/git.diff", 'w') as f:
subprocess.run(["git", "diff"], stdout=f)
with open(dir+"/git.describe", 'w') as f:
subprocess.run(["git", "describe", "--always"], stdout=f)
logger.info("Starting simulations:", extra=format)
start = time.time()
for shape in config.nextShape():
comparison_dict = shape.__dict__.copy()
ignore_keys = ['randomSeed']
for key in ignore_keys:
del comparison_dict[key]
if any(all(comparison_dict[key] == completed_shape[key] for key in comparison_dict.keys() if key not in ignore_keys) for completed_shape in completed_shapes):
logger.info("Skipping simulation for shape (already completed): %s" % (str(shape.__dict__)), extra=format)
else:
results.append(delayed(runOnce)(config, shape, execID))
results = Parallel(config.numJobs)(results)
end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization:
vis = Visualizer(execID, config)
vis.plotHeatmaps()
visual = Visualizor(execID, config, results)
visual.plotHeatmaps("nn", "fr")
def study():
restart_path = None
for arg in sys.argv[1:]:
if arg.startswith("--restart="):
restart_path = arg[len("--restart="):]
if restart_path:
execID = restart_path.split("/")[1]
state_file = f"results/{execID}/backup"
all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file)
current_shapes = []
config = importlib.import_module("smallConf")
completed_shapes_without_seed = completed_shapes
for shape in config.nextShape():
shape_dict = copy.deepcopy(shape.__dict__)
del shape_dict['randomSeed']
current_shapes.append(shape_dict)
for shape in completed_shapes_without_seed:
if 'randomSeed' in shape:
del shape['randomSeed']
completed_set = {frozenset(shape.items()) for shape in completed_shapes_without_seed}
current_set = {frozenset(shape.items()) for shape in current_shapes}
if all_completed and completed_set == current_set:
print("Simulation is already completed.")
sys.exit(0)
else:
print("Restarting simulations.")
start_simulation(execID, completed_files, completed_shapes, incomplete_files)
sys.exit(0)
if len(sys.argv) < 2:
print("You need to pass a configuration file in parameter")
exit(1)