Restart workflow without form data. Committing so Dan can check it out

This commit is contained in:
mike cullerton 2021-01-14 15:32:14 -05:00
parent 99c1d1b129
commit 6a9e6d3570
5 changed files with 53 additions and 37 deletions

View File

@ -852,16 +852,16 @@ paths:
operationId: crc.api.workflow.get_workflow
summary: Returns a workflow, can also be used to do a soft or hard reset on the workflow.
parameters:
- name: soft_reset
- name: clear_data
in: query
required: false
description: Set this to true to use the latest workflow specification to load minor modifications to the spec.
description: Set this to true to clear data when starting workflow.
schema:
type: boolean
- name: hard_reset
- name: reload_spec
in: query
required: false
description: Set this to true to reset the workflow
description: Set this to true to load latest workflow spec.
schema:
type: boolean
- name: do_engine_steps

View File

@ -96,27 +96,35 @@ def delete_workflow_specification(spec_id):
session.commit()
def get_workflow(workflow_id, soft_reset=False, hard_reset=False, do_engine_steps=True):
def get_workflow(workflow_id, reload_spec=False, clear_data=False, do_engine_steps=True):
"""Soft reset will attempt to update to the latest spec without starting over,
Hard reset will update to the latest spec and start from the beginning.
Read Only will return the workflow in a read only state, without running any
engine tasks or logging any events. """
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model)
if soft_reset or hard_reset:
processor = WorkflowProcessor(workflow_model, reload_spec=reload_spec, clear_data=clear_data)
if reload_spec or clear_data:
try:
processor.cancel_notify()
except Exception as e:
raise e
finally:
# In the event of a reset, ALWAYS allow the reset, even if the cancel_notify fails for some reason.
processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset)
processor = WorkflowProcessor(workflow_model, reload_spec=reload_spec, clear_data=clear_data)
if do_engine_steps:
processor.do_engine_steps()
processor.save()
WorkflowService.update_task_assignments(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
if clear_data:
remove_keys = []
for datum in workflow_api_model.next_task.data:
if datum != 'current_user':
remove_keys.append(datum)
if len(remove_keys) > 0:
for key in remove_keys:
del(workflow_api_model.next_task.data[key])
return WorkflowApiSchema().dump(workflow_api_model)

View File

@ -21,6 +21,7 @@ import crc
from crc import session, app
from crc.api.common import ApiError
from crc.models.file import FileDataModel, FileModel, FileType
from crc.models.task_event import TaskEventModel
from crc.models.workflow import WorkflowStatus, WorkflowModel, WorkflowSpecDependencyFile
from crc.scripts.script import Script
from crc.services.file_service import FileService
@ -151,7 +152,7 @@ class WorkflowProcessor(object):
VALIDATION_PROCESS_KEY = "validate_only"
def __init__(self, workflow_model: WorkflowModel,
soft_reset=False, hard_reset=False, validate_only=False):
reload_spec=False, clear_data=False, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model.
If soft_reset is set to true, it will try to use the latest version of the workflow specification
without resetting to the beginning of the workflow. This will work for some minor changes to the spec.
@ -162,7 +163,7 @@ class WorkflowProcessor(object):
self.workflow_model = workflow_model
if soft_reset or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started.
if reload_spec or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started.
self.spec_data_files = FileService.get_spec_data_files(
workflow_spec_id=workflow_model.workflow_spec_id)
spec = self.get_spec(self.spec_data_files, workflow_model.workflow_spec_id)
@ -196,13 +197,17 @@ class WorkflowProcessor(object):
" '%s' version %s, due to a mis-placed or missing task '%s'" %
(self.workflow_spec_id, self.get_version_string(), str(ke)) +
" This is very likely due to a soft reset where there was a structural change.")
if hard_reset:
# Now that the spec is loaded, get the data and rebuild the bpmn with the new details
self.hard_reset()
if clear_data:
# Clear form_data from task_events
task_events = session.query(TaskEventModel). \
filter(TaskEventModel.workflow_id == workflow_model.id).all()
for task_event in task_events:
task_event.form_data = {}
session.add(task_event)
session.commit()
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(self.bpmn_workflow)
self.save()
if soft_reset:
self.save()
# set whether this is the latest spec file.
if self.spec_data_files == FileService.get_spec_data_files(workflow_spec_id=workflow_model.workflow_spec_id):
@ -355,18 +360,18 @@ class WorkflowProcessor(object):
else:
return WorkflowStatus.waiting
def hard_reset(self):
"""Recreate this workflow. This will be useful when a workflow specification changes.
"""
self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
new_bpmn_workflow.data = self.bpmn_workflow.data
try:
new_bpmn_workflow.do_engine_steps()
except WorkflowException as we:
raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender)
self.bpmn_workflow = new_bpmn_workflow
# def hard_reset(self):
# """Recreate this workflow. This will be useful when a workflow specification changes.
# """
# self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
# new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
# new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
# new_bpmn_workflow.data = self.bpmn_workflow.data
# try:
# new_bpmn_workflow.do_engine_steps()
# except WorkflowException as we:
# raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender)
# self.bpmn_workflow = new_bpmn_workflow
def get_status(self):
return self.status_of(self.bpmn_workflow)

View File

@ -341,13 +341,14 @@ class BaseTest(unittest.TestCase):
session.commit()
return approval
def get_workflow_api(self, workflow, soft_reset=False, hard_reset=False, do_engine_steps=True, user_uid="dhf8r"):
def get_workflow_api(self, workflow, reload_spec=False, clear_data=False, do_engine_steps=True, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
rv = self.app.get(f'/v1.0/workflow/{workflow.id}'
f'?soft_reset={str(soft_reset)}'
f'&hard_reset={str(hard_reset)}'
f'&do_engine_steps={str(do_engine_steps)}',
url = (f'/v1.0/workflow/{workflow.id}'
f'?clear_data={str(clear_data)}'
f'&reload_spec={str(reload_spec)}'
f'&do_engine_steps={str(do_engine_steps)}')
rv = self.app.get(url,
headers=self.logged_in_headers(user),
content_type="application/json")
self.assert_success(rv)

View File

@ -5,14 +5,16 @@ class TestMessageEvent(BaseTest):
def test_message_event(self):
# self.load_example_data()
workflow = self.create_workflow('message_event')
first_task = self.get_workflow_api(workflow).next_task
self.assertEqual('Activity_GetData', first_task.name)
workflow_api = self.get_workflow_api(workflow, clear_data=True)
result = self.complete_form(workflow_api, first_task, {'name': 'asdf'})
self.assertEqual('asdf', result.next_task.data['name'])
workflow_api = self.get_workflow_api(workflow_api)
self.assertNotIn('name', workflow_api.next_task.data)
workflow_api = self.get_workflow_api(workflow)
result = self.complete_form(workflow_api, first_task, {'formdata': 'asdf'})
self.assertIn('formdata', result.next_task.data)
self.assertEqual('asdf', result.next_task.data['formdata'])
workflow_api = self.get_workflow_api(workflow_api, clear_data=True)
self.assertNotIn('formdata', workflow_api.next_task.data)
print('Nice Test')