Implement stop and resume functionality

Signed-off-by: Arunima Chaudhuri <arunimachaudhuri2020@gmail.com>
This commit is contained in:
Arunima Chaudhuri 2024-03-27 20:38:47 +00:00
parent c4db8e408e
commit d591c1724f
2 changed files with 61 additions and 45 deletions

View File

@ -281,11 +281,10 @@ class Simulator:
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl")
with open(backup_file, 'ab') as f:
pickle.dump(self.shape.__dict__, f)
while(True):
vectors_data = []
missingVector.append(missingSamples)
self.logger.debug("Expected Samples: %d" % expected, extra=self.format)
self.logger.debug("Missing Samples: %d" % missingSamples, extra=self.format)
@ -362,42 +361,9 @@ class Simulator:
self.logger.debug("The entire block is available at step %d, with failure rate %d !" % (steps, self.shape.failureRate), extra=self.format)
missingVector.append(missingSamples)
break
for i in range(0, self.shape.numberNodes):
validator_data = {
'validator_ID': self.validators[i].ID,
'rowIDs': list(self.validators[i].rowIDs),
'columnIDs': list(self.validators[i].columnIDs),
'amImalicious': self.validators[i].amImalicious,
'amIaddedToQueue': self.validators[i].amIaddedToQueue,
'msgSentCount': self.validators[i].msgSentCount,
'msgRecvCount': self.validators[i].msgRecvCount,
'sampleSentCount': self.validators[i].sampleSentCount,
'sampleRecvCount': self.validators[i].sampleRecvCount,
'restoreRowCount': self.validators[i].restoreRowCount,
'restoreColumnCount': self.validators[i].restoreColumnCount,
'repairedSampleCount': self.validators[i].repairedSampleCount,
'rowNeighbors': list(self.validators[i].rowNeighbors),
'columnNeighbors': list(self.validators[i].columnNeighbors)
}
vectors_data.append(validator_data)
# Alse store for initNetwork
vectors_data += (progressVector,missingVector)
backup_folder = f"results/{self.execID}/backup"
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl")
with open(backup_file, 'ab') as f:
pickle.dump(vectors_data, f)
steps += 1
backup_folder = f"results/{self.execID}/backup"
if not os.path.exists(backup_folder):
os.makedirs(backup_folder)
backup_file = os.path.join(backup_folder, f"simulation_data_{unique_run_id}.pkl")
with open(backup_file, 'ab') as f: # Open in append binary mode
with open(backup_file, 'ab') as f:
pickle.dump("completed", f)
for i in range(0,self.shape.numberNodes):

View File

@ -53,7 +53,9 @@ def check_simulation_completion(state_file):
all_completed = True
incomplete_files = []
for filename in sorted(os.listdir(backup_dir), reverse=True): # Iterate in reverse order
completed_files = []
completed_shapes = []
for filename in sorted(os.listdir(backup_dir), reverse=True):
if not filename.endswith(".pkl"):
continue
full_path = os.path.join(backup_dir, filename)
@ -63,19 +65,65 @@ def check_simulation_completion(state_file):
while True:
try:
item = pickle.load(f)
items.append(item) # Load all items
except EOFError: # Reached end of file
items.append(item)
except EOFError:
break
last_item = items[-1] # Access the last item
# print(last_item)
if last_item != "completed":
all_completed = False
incomplete_files.append(full_path)
else:
completed_files.append(full_path)
completed_shapes.append(items[0])
except (OSError, pickle.UnpicklingError) as e:
print(f"Error loading state from {full_path}: {e}")
all_completed = False # Treat errors as incomplete
break # No need to continue checking other files
return all_completed, incomplete_files
all_completed = False
break
return all_completed, incomplete_files, completed_files, completed_shapes
def start_simulation(execID, completed_files, completed_shapes, incomplete_files):
config = importlib.import_module("smallConf")
logger = initLogger(config)
format = {"entity": "Study"}
results = []
if not os.path.exists("results"):
os.makedirs("results")
dir = "results/"+execID
if not os.path.exists(dir):
os.makedirs(dir)
if config.saveGit:
with open(dir+"/git.diff", 'w') as f:
subprocess.run(["git", "diff"], stdout=f)
with open(dir+"/git.describe", 'w') as f:
subprocess.run(["git", "describe", "--always"], stdout=f)
subprocess.run(["cp", sys.argv[1], dir+"/"])
logger.info("Starting simulations:", extra=format)
start = time.time()
for shape in config.nextShape():
comparison_dict = shape.__dict__.copy()
ignore_keys = ['randomSeed']
for key in ignore_keys:
del comparison_dict[key]
if any(all(comparison_dict[key] == completed_shape[key] for key in comparison_dict.keys() if key not in ignore_keys) for completed_shape in completed_shapes):
print(f"Skipping simulation for shape: {shape.__dict__} (already completed)")
else:
results.append(delayed(runOnce)(config, shape, execID))
results = Parallel(config.numJobs)(results)
end = time.time()
logger.info("A total of %d simulations ran in %d seconds" % (len(results), end-start), extra=format)
if config.visualization:
vis = Visualizer(execID, config)
vis.plotHeatmaps()
visual = Visualizor(execID, config, results)
visual.plotHeatmaps("nn", "fr")
def study():
@ -87,12 +135,14 @@ def study():
if restart_path:
execID = restart_path.split("/")[1]
state_file = f"results/{execID}/backup"
all_completed, incomplete_files = check_simulation_completion(state_file)
all_completed, incomplete_files, completed_files, completed_shapes = check_simulation_completion(state_file)
print(completed_shapes)
if all_completed:
print("Simulation is already completed.")
sys.exit(0) # Exit gracefully if already completed
sys.exit(0)
else:
print(incomplete_files)
print("Restarting simulations.")
start_simulation(execID, completed_files, completed_shapes, incomplete_files)
sys.exit(0)
if len(sys.argv) < 2: