Merge pull request #259 from sartography/bug/reset_study_when_completely_horked
Sometimes a workflow can be completely broken and unload-able. For ins…
This commit is contained in:
commit
1f32f5fb4d
|
@ -120,7 +120,7 @@ def restart_workflow(workflow_id, clear_data=False):
|
||||||
"""Restart a workflow with the latest spec.
|
"""Restart a workflow with the latest spec.
|
||||||
Clear data allows user to restart the workflow without previous data."""
|
Clear data allows user to restart the workflow without previous data."""
|
||||||
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
||||||
WorkflowProcessor(workflow_model).reset(workflow_model, clear_data=clear_data)
|
WorkflowProcessor.reset(workflow_model, clear_data=clear_data)
|
||||||
return get_workflow(workflow_model.id)
|
return get_workflow(workflow_model.id)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -173,10 +173,19 @@ class WorkflowProcessor(object):
|
||||||
else:
|
else:
|
||||||
self.is_latest_spec = False
|
self.is_latest_spec = False
|
||||||
|
|
||||||
def reset(self, workflow_model, clear_data=False):
|
@staticmethod
|
||||||
|
def reset(workflow_model, clear_data=False):
|
||||||
print('WorkflowProcessor: reset: ')
|
print('WorkflowProcessor: reset: ')
|
||||||
|
|
||||||
self.cancel_notify()
|
# Try to execute a cancel notify
|
||||||
|
try:
|
||||||
|
wp = WorkflowProcessor(workflow_model)
|
||||||
|
wp.cancel_notify() # The executes a notification to all endpoints that
|
||||||
|
except Exception as e:
|
||||||
|
app.logger.error(f"Unable to send a cancel notify for workflow %s during a reset."
|
||||||
|
f" Continuing with the reset anyway so we don't get in an unresolvable"
|
||||||
|
f" state. An %s error occured with the following information: %s" %
|
||||||
|
(workflow_model.id, e.__class__.__name__, str(e)))
|
||||||
workflow_model.bpmn_workflow_json = None
|
workflow_model.bpmn_workflow_json = None
|
||||||
if clear_data:
|
if clear_data:
|
||||||
# Clear form_data from task_events
|
# Clear form_data from task_events
|
||||||
|
@ -186,7 +195,7 @@ class WorkflowProcessor(object):
|
||||||
task_event.form_data = {}
|
task_event.form_data = {}
|
||||||
session.add(task_event)
|
session.add(task_event)
|
||||||
session.commit()
|
session.commit()
|
||||||
return self.__init__(workflow_model)
|
return WorkflowProcessor(workflow_model)
|
||||||
|
|
||||||
def __get_bpmn_workflow(self, workflow_model: WorkflowModel, spec: WorkflowSpec, validate_only=False):
|
def __get_bpmn_workflow(self, workflow_model: WorkflowModel, spec: WorkflowSpec, validate_only=False):
|
||||||
if workflow_model.bpmn_workflow_json:
|
if workflow_model.bpmn_workflow_json:
|
||||||
|
|
|
@ -731,5 +731,4 @@ class WorkflowService(object):
|
||||||
workflows = db.session.query(WorkflowModel).filter_by(study_id=study_id).all()
|
workflows = db.session.query(WorkflowModel).filter_by(study_id=study_id).all()
|
||||||
for workflow in workflows:
|
for workflow in workflows:
|
||||||
if workflow.status == WorkflowStatus.user_input_required or workflow.status == WorkflowStatus.waiting:
|
if workflow.status == WorkflowStatus.user_input_required or workflow.status == WorkflowStatus.waiting:
|
||||||
processor = WorkflowProcessor(workflow)
|
WorkflowProcessor.reset(workflow, clear_data=False)
|
||||||
processor.reset(workflow)
|
|
||||||
|
|
|
@ -54,8 +54,7 @@ class TestLookupService(BaseTest):
|
||||||
|
|
||||||
# restart the workflow, so it can pick up the changes.
|
# restart the workflow, so it can pick up the changes.
|
||||||
|
|
||||||
processor = WorkflowProcessor(workflow)
|
processor = WorkflowProcessor.reset(workflow)
|
||||||
processor.reset(workflow)
|
|
||||||
workflow = processor.workflow_model
|
workflow = processor.workflow_model
|
||||||
|
|
||||||
LookupService.lookup(workflow, "Task_Enum_Lookup", "sponsor", "sam", limit=10)
|
LookupService.lookup(workflow, "Task_Enum_Lookup", "sponsor", "sam", limit=10)
|
||||||
|
@ -92,8 +91,7 @@ class TestLookupService(BaseTest):
|
||||||
results = LookupService.lookup(workflow, task.task_spec.name, "selectedItem", "", value="apples", limit=10)
|
results = LookupService.lookup(workflow, task.task_spec.name, "selectedItem", "", value="apples", limit=10)
|
||||||
self.assertEqual(0, len(results), "We shouldn't find our fruits mixed in with our animals.")
|
self.assertEqual(0, len(results), "We shouldn't find our fruits mixed in with our animals.")
|
||||||
|
|
||||||
|
processor = WorkflowProcessor.reset(workflow, clear_data=True)
|
||||||
processor.reset(workflow, clear_data=True)
|
|
||||||
processor.do_engine_steps()
|
processor.do_engine_steps()
|
||||||
task = processor.get_ready_user_tasks()[0]
|
task = processor.get_ready_user_tasks()[0]
|
||||||
task.data = {"type": "fruit"}
|
task.data = {"type": "fruit"}
|
||||||
|
|
|
@ -213,6 +213,27 @@ class TestTasksApi(BaseTest):
|
||||||
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
|
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
|
||||||
self.assertTrue(workflow_api.is_latest_spec)
|
self.assertTrue(workflow_api.is_latest_spec)
|
||||||
|
|
||||||
|
def test_reset_workflow_from_broken_spec(self):
|
||||||
|
# Start the basic two_forms workflow and complete a task.
|
||||||
|
workflow = self.create_workflow('two_forms')
|
||||||
|
workflow_api = self.get_workflow_api(workflow)
|
||||||
|
self.complete_form(workflow, workflow_api.next_task, {"color": "blue"})
|
||||||
|
self.assertTrue(workflow_api.is_latest_spec)
|
||||||
|
|
||||||
|
# Break the bpmn json
|
||||||
|
workflow.bpmn_workflow_json = '{"something":"broken"}'
|
||||||
|
session.add(workflow)
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
# Try to load the workflow, we should get an error
|
||||||
|
with self.assertRaises(Exception):
|
||||||
|
workflow_api = self.complete_form(workflow, workflow_api.next_task, {"name": "Dan"})
|
||||||
|
|
||||||
|
# Now, Reset the workflow, and we should not get an error
|
||||||
|
workflow_api = self.restart_workflow_api(workflow_api, clear_data=True)
|
||||||
|
self.assertIsNotNone(workflow_api)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_manual_task_with_external_documentation(self):
|
def test_manual_task_with_external_documentation(self):
|
||||||
workflow = self.create_workflow('manual_task_with_external_documentation')
|
workflow = self.create_workflow('manual_task_with_external_documentation')
|
||||||
|
|
|
@ -279,7 +279,7 @@ class TestWorkflowProcessor(BaseTest):
|
||||||
self.assertFalse(processor2.is_latest_spec) # Still at version 1.
|
self.assertFalse(processor2.is_latest_spec) # Still at version 1.
|
||||||
|
|
||||||
# Do a hard reset, which should bring us back to the beginning, but retain the data.
|
# Do a hard reset, which should bring us back to the beginning, but retain the data.
|
||||||
processor2.reset(processor2.workflow_model)
|
processor2 = WorkflowProcessor.reset(processor2.workflow_model)
|
||||||
processor3 = WorkflowProcessor(processor.workflow_model)
|
processor3 = WorkflowProcessor(processor.workflow_model)
|
||||||
processor3.do_engine_steps()
|
processor3.do_engine_steps()
|
||||||
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
|
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
|
||||||
|
|
Loading…
Reference in New Issue