diff --git a/DAS/simulator.py b/DAS/simulator.py index 7d4b341..3657b03 100644 --- a/DAS/simulator.py +++ b/DAS/simulator.py @@ -273,6 +273,7 @@ class Simulator: trafficStatsVector = [] malicious_nodes_not_added_count = 0 steps = 0 + while(True): missingVector.append(missingSamples) self.logger.debug("Expected Samples: %d" % expected, extra=self.format) @@ -352,6 +353,7 @@ class Simulator: break steps += 1 + for i in range(0,self.shape.numberNodes): if not self.validators[i].amIaddedToQueue : malicious_nodes_not_added_count += 1 diff --git a/study.py b/study.py index 380cf30..7ad33b5 100644 --- a/study.py +++ b/study.py @@ -5,6 +5,9 @@ import importlib import subprocess from joblib import Parallel, delayed from DAS import * +import os +import pickle +import uuid # Parallel execution: # The code currently uses 'joblib' to execute on multiple cores. For other options such as 'ray', see @@ -29,6 +32,13 @@ def runOnce(config, shape, execID): shape.setSeed(config.randomSeed+"-"+str(shape)) random.seed(shape.randomSeed) + backup_folder = f"results/{execID}/backup" + if not os.path.exists(backup_folder): + os.makedirs(backup_folder) + backup_file = os.path.join(backup_folder, f"simulation_data_{shape}.pkl") + with open(backup_file, 'ab') as f: + pickle.dump(shape.__dict__, f) + sim = Simulator(shape, config, execID) sim.initLogger() sim.initValidators() @@ -43,9 +53,124 @@ def runOnce(config, shape, execID): visual = Visualizor(execID, config, [result]) visual.plotAll() + with open(backup_file, 'ab') as f: + pickle.dump("completed", f) + return result + +def check_simulation_completion(state_file): + backup_dir = os.path.join(os.path.dirname(state_file), "backup") + if not os.path.exists(backup_dir): + return False + + all_completed = True + incomplete_files = [] + completed_files = [] + completed_shapes = [] + for filename in sorted(os.listdir(backup_dir), reverse=True): + if not filename.endswith(".pkl"): + continue + full_path = os.path.join(backup_dir, filename) + try: + with open(full_path, 'rb') as f: + items = [] + while True: + try: + item = pickle.load(f) + items.append(item) + except EOFError: + break + last_item = items[-1] + if last_item != "completed": + all_completed = False + incomplete_files.append(full_path) + else: + completed_files.append(full_path) + completed_shapes.append(items[0]) + except (OSError, pickle.UnpicklingError) as e: + print(f"Error loading state from {full_path}: {e}") + all_completed = False + break + return all_completed, incomplete_files, completed_files, completed_shapes + + +def start_simulation(execID, completed_files, completed_shapes, incomplete_files): + config = importlib.import_module("smallConf") + logger = initLogger(config) + format = {"entity": "Study"} + + results = [] + if not os.path.exists("results"): + os.makedirs("results") + dir = "results/"+execID + if not os.path.exists(dir): + os.makedirs(dir) + if config.saveGit: + with open(dir+"/git.diff", 'w') as f: + subprocess.run(["git", "diff"], stdout=f) + with open(dir+"/git.describe", 'w') as f: + subprocess.run(["git", "describe", "--always"], stdout=f) + + logger.info("Starting simulations:", extra=format) + start = time.time() + for shape in config.nextShape(): + comparison_dict = shape.__dict__.copy() + ignore_keys = ['randomSeed'] + for key in ignore_keys: + del comparison_dict[key] + + if any(all(comparison_dict[key] == completed_shape[key] for key in comparison_dict.keys() if key not in ignore_keys) for completed_shape in completed_shapes): + logger.info("Skipping simulation for shape (already completed): %s" % (str(shape.__dict__)), extra=format) + else: + results.append(delayed(runOnce)(config, shape, execID)) + + results = Parallel(config.numJobs)(results) + end = time.time() + logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format) + + if config.visualization: + vis = Visualizer(execID, config) + vis.plotHeatmaps() + + visual = Visualizor(execID, config, results) + visual.plotHeatmaps("nn", "fr") + + def study(): + restart_path = None + for arg in sys.argv[1:]: + if arg.startswith("--restart="): + restart_path = arg[len("--restart="):] + + if restart_path: + execID = restart_path.split("/")[1] + state_file = f"results/{execID}/backup" + all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file) + + current_shapes = [] + config = importlib.import_module("smallConf") + + completed_shapes_without_seed = completed_shapes + for shape in config.nextShape(): + shape_dict = copy.deepcopy(shape.__dict__) + del shape_dict['randomSeed'] + current_shapes.append(shape_dict) + for shape in completed_shapes_without_seed: + if 'randomSeed' in shape: + del shape['randomSeed'] + + completed_set = {frozenset(shape.items()) for shape in completed_shapes_without_seed} + current_set = {frozenset(shape.items()) for shape in current_shapes} + + if all_completed and completed_set == current_set: + print("Simulation is already completed.") + sys.exit(0) + else: + print("Restarting simulations.") + start_simulation(execID, completed_files, completed_shapes, incomplete_files) + sys.exit(0) + if len(sys.argv) < 2: print("You need to pass a configuration file in parameter") exit(1)