Merge pull request #224 from sartography/restart-with-no-data-99

Restart with no data 99
This commit is contained in:
Dan Funk 2021-01-20 17:22:18 -05:00 committed by GitHub
commit 5b13195182
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 137 additions and 115 deletions

View File

@ -850,20 +850,8 @@ paths:
format: int32
get:
operationId: crc.api.workflow.get_workflow
summary: Returns a workflow, can also be used to do a soft or hard reset on the workflow.
summary: Returns a workflow.
parameters:
- name: soft_reset
in: query
required: false
description: Set this to true to use the latest workflow specification to load minor modifications to the spec.
schema:
type: boolean
- name: hard_reset
in: query
required: false
description: Set this to true to reset the workflow
schema:
type: boolean
- name: do_engine_steps
in: query
required: false
@ -889,6 +877,35 @@ paths:
responses:
'204':
description: The workflow was removed
/workflow/{workflow_id}/restart:
parameters:
- name: workflow_id
in: path
required: true
description: The id of the workflow
schema:
type: integer
format: int32
get:
operationId: crc.api.workflow.restart_workflow
summary: Restarts a workflow with the latest spec. Can also clear data.
parameters:
- name: clear_data
in: query
required: false
description: Set this to true to clear data when starting workflow.
schema:
type: boolean
tags:
- Workflows and Tasks
responses:
'200':
description: Returns updated workflow, possibly without data.
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/workflow/{workflow_id}/task/{task_id}/data:
parameters:
- name: workflow_id

View File

@ -96,30 +96,29 @@ def delete_workflow_specification(spec_id):
session.commit()
def get_workflow(workflow_id, soft_reset=False, hard_reset=False, do_engine_steps=True):
"""Soft reset will attempt to update to the latest spec without starting over,
Hard reset will update to the latest spec and start from the beginning.
Read Only will return the workflow in a read only state, without running any
engine tasks or logging any events. """
def get_workflow(workflow_id, do_engine_steps=True):
"""Retrieve workflow based on workflow_id, and return it in the last saved State.
If do_engine_steps is False, return the workflow without running any engine tasks or logging any events. """
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model)
if soft_reset or hard_reset:
try:
processor.cancel_notify()
except Exception as e:
raise e
finally:
# In the event of a reset, ALWAYS allow the reset, even if the cancel_notify fails for some reason.
processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset)
if do_engine_steps:
processor.do_engine_steps()
processor.save()
WorkflowService.update_task_assignments(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)
def restart_workflow(workflow_id, clear_data=False):
"""Restart a workflow with the latest spec.
Clear data allows user to restart the workflow without previous data."""
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
WorkflowProcessor.reset(workflow_model, clear_data=clear_data)
return get_workflow(workflow_model.id)
def get_task_events(action = None, workflow = None, study = None):
"""Provides a way to see a history of what has happened, or get a list of tasks that need your attention."""
query = session.query(TaskEventModel).filter(TaskEventModel.user_uid == g.user.uid)

View File

@ -21,6 +21,7 @@ import crc
from crc import session, app
from crc.api.common import ApiError
from crc.models.file import FileDataModel, FileModel, FileType
from crc.models.task_event import TaskEventModel
from crc.models.workflow import WorkflowStatus, WorkflowModel, WorkflowSpecDependencyFile
from crc.scripts.script import Script
from crc.services.file_service import FileService
@ -150,19 +151,12 @@ class WorkflowProcessor(object):
STUDY_ID_KEY = "study_id"
VALIDATION_PROCESS_KEY = "validate_only"
def __init__(self, workflow_model: WorkflowModel,
soft_reset=False, hard_reset=False, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model.
If soft_reset is set to true, it will try to use the latest version of the workflow specification
without resetting to the beginning of the workflow. This will work for some minor changes to the spec.
If hard_reset is set to true, it will use the latest spec, and start the workflow over from the beginning.
which should work in casees where a soft reset fails.
If neither flag is set, it will use the same version of the specification that was used to originally
create the workflow model. """
def __init__(self, workflow_model: WorkflowModel, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model."""
self.workflow_model = workflow_model
if soft_reset or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started.
if workflow_model.bpmn_workflow_json is None: # The workflow was never started.
self.spec_data_files = FileService.get_spec_data_files(
workflow_spec_id=workflow_model.workflow_spec_id)
spec = self.get_spec(self.spec_data_files, workflow_model.workflow_spec_id)
@ -187,22 +181,14 @@ class WorkflowProcessor(object):
# can then load data as needed.
self.bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(
self.bpmn_workflow,include_spec=True)
self.bpmn_workflow, include_spec=True)
self.save()
except MissingSpecError as ke:
raise ApiError(code="unexpected_workflow_structure",
message="Failed to deserialize workflow"
" '%s' version %s, due to a mis-placed or missing task '%s'" %
(self.workflow_spec_id, self.get_version_string(), str(ke)) +
" This is very likely due to a soft reset where there was a structural change.")
if hard_reset:
# Now that the spec is loaded, get the data and rebuild the bpmn with the new details
self.hard_reset()
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(self.bpmn_workflow)
self.save()
if soft_reset:
self.save()
(self.workflow_spec_id, self.get_version_string(), str(ke)))
# set whether this is the latest spec file.
if self.spec_data_files == FileService.get_spec_data_files(workflow_spec_id=workflow_model.workflow_spec_id):
@ -210,6 +196,21 @@ class WorkflowProcessor(object):
else:
self.is_latest_spec = False
@classmethod
def reset(cls, workflow_model, clear_data=False):
print('WorkflowProcessor: reset: ')
workflow_model.bpmn_workflow_json = None
if clear_data:
# Clear form_data from task_events
task_events = session.query(TaskEventModel). \
filter(TaskEventModel.workflow_id == workflow_model.id).all()
for task_event in task_events:
task_event.form_data = {}
session.add(task_event)
session.commit()
return cls(workflow_model)
def __get_bpmn_workflow(self, workflow_model: WorkflowModel, spec: WorkflowSpec, validate_only=False):
if workflow_model.bpmn_workflow_json:
bpmn_workflow = self._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json,
@ -355,18 +356,18 @@ class WorkflowProcessor(object):
else:
return WorkflowStatus.waiting
def hard_reset(self):
"""Recreate this workflow. This will be useful when a workflow specification changes.
"""
self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
new_bpmn_workflow.data = self.bpmn_workflow.data
try:
new_bpmn_workflow.do_engine_steps()
except WorkflowException as we:
raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender)
self.bpmn_workflow = new_bpmn_workflow
# def hard_reset(self):
# """Recreate this workflow. This will be useful when a workflow specification changes.
# """
# self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
# new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
# new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
# new_bpmn_workflow.data = self.bpmn_workflow.data
# try:
# new_bpmn_workflow.do_engine_steps()
# except WorkflowException as we:
# raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender)
# self.bpmn_workflow = new_bpmn_workflow
def get_status(self):
return self.status_of(self.bpmn_workflow)

View File

@ -341,18 +341,30 @@ class BaseTest(unittest.TestCase):
session.commit()
return approval
def get_workflow_api(self, workflow, soft_reset=False, hard_reset=False, do_engine_steps=True, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
rv = self.app.get(f'/v1.0/workflow/{workflow.id}'
f'?soft_reset={str(soft_reset)}'
f'&hard_reset={str(hard_reset)}'
f'&do_engine_steps={str(do_engine_steps)}',
def get_workflow_common(self, url, user):
rv = self.app.get(url,
headers=self.logged_in_headers(user),
content_type="application/json")
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
workflow_api = WorkflowApiSchema().load(json_data)
return workflow_api
def get_workflow_api(self, workflow, do_engine_steps=True, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
url = (f'/v1.0/workflow/{workflow.id}'
f'?do_engine_steps={str(do_engine_steps)}')
workflow_api = self.get_workflow_common(url, user)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
return workflow_api
def restart_workflow_api(self, workflow, clear_data=False, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
url = (f'/v1.0/workflow/{workflow.id}/restart'
f'?clear_data={str(clear_data)}')
workflow_api = self.get_workflow_common(url, user)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
return workflow_api

View File

@ -53,7 +53,9 @@ class TestLookupService(BaseTest):
file.close()
# restart the workflow, so it can pick up the changes.
WorkflowProcessor(workflow, soft_reset=True)
processor = WorkflowProcessor.reset(workflow)
workflow = processor.workflow_model
LookupService.lookup(workflow, "sponsor", "sam", limit=10)
lookup_records = session.query(LookupFileModel).all()

View File

@ -204,7 +204,7 @@ class TestTasksApi(BaseTest):
self.assertTrue(workflow_api.spec_version.startswith("v1 "))
self.assertFalse(workflow_api.is_latest_spec)
workflow_api = self.get_workflow_api(workflow, hard_reset=True)
workflow_api = self.restart_workflow_api(workflow_api, clear_data=True)
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
self.assertTrue(workflow_api.is_latest_spec)
@ -213,30 +213,6 @@ class TestTasksApi(BaseTest):
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
self.assertTrue(workflow_api.is_latest_spec)
def test_soft_reset_errors_out_and_next_result_is_on_original_version(self):
# Start the basic two_forms workflow and complete a task.
workflow = self.create_workflow('two_forms')
workflow_api = self.get_workflow_api(workflow)
self.complete_form(workflow, workflow_api.next_task, {"color": "blue"})
self.assertTrue(workflow_api.is_latest_spec)
# Modify the specification, with a major change that alters the flow and can't be deserialized
# effectively, if it uses the latest spec files.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'modified', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# perform a soft reset returns an error
rv = self.app.get('/v1.0/workflow/%i?soft_reset=%s&hard_reset=%s' %
(workflow.id, "true", "false"),
content_type="application/json",
headers=self.logged_in_headers())
self.assert_failure(rv, error_code="unexpected_workflow_structure")
# Try again without a soft reset, and we are still ok, and on the original version.
workflow_api = self.get_workflow_api(workflow)
self.assertTrue(workflow_api.spec_version.startswith("v1 "))
self.assertFalse(workflow_api.is_latest_spec)
def test_manual_task_with_external_documentation(self):
workflow = self.create_workflow('manual_task_with_external_documentation')

View File

@ -228,7 +228,8 @@ class TestTasksApi(BaseTest):
self.assertEqual(1, len(self.get_assignment_task_events(supervisor.uid)))
# Resetting the workflow at this point should clear the event log.
workflow_api = self.get_workflow_api(workflow, hard_reset=True, user_uid=submitter.uid)
workflow_api = self.restart_workflow_api(workflow, user_uid=submitter.uid)
workflow_api = self.get_workflow_api(workflow, user_uid=submitter.uid)
self.assertEqual(0, len(self.get_assignment_task_events(supervisor.uid)))
# Re-complete first task, and awaiting tasks should shift to 0 for for submitter, and 1 for supervisor

View File

@ -173,28 +173,6 @@ class TestWorkflowProcessor(BaseTest):
self.assertEqual("workflow_validation_error", context.exception.code)
self.assertTrue("bpmn:startEvent" in context.exception.message)
def test_workflow_spec_key_error(self):
"""Frequently seeing errors in the logs about a 'Key' error, where a workflow
references something that doesn't exist in the midst of processing. Want to
make sure we produce errors to the front end that allows us to debug this."""
# Start the two_forms workflow, and enter some data in the first form.
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
# Modify the specification, with a major change.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'modified', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Attempting a soft update on a structural change should raise a sensible error.
with self.assertRaises(ApiError) as context:
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
self.assertEqual("unexpected_workflow_structure", context.exception.code)
def test_workflow_with_bad_expression_raises_sensible_error(self):
self.load_example_data()
@ -301,7 +279,9 @@ class TestWorkflowProcessor(BaseTest):
self.assertFalse(processor2.is_latest_spec) # Still at version 1.
# Do a hard reset, which should bring us back to the beginning, but retain the data.
processor3 = WorkflowProcessor(processor.workflow_model, hard_reset=True)
WorkflowProcessor.reset(processor2.workflow_model)
processor3 = WorkflowProcessor(processor.workflow_model)
processor3.do_engine_steps()
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
self.assertTrue(processor3.is_latest_spec) # Now at version 2.
task = processor3.next_task()

View File

@ -0,0 +1,34 @@
from tests.base_test import BaseTest
class TestMessageEvent(BaseTest):
def test_message_event(self):
workflow = self.create_workflow('message_event')
first_task = self.get_workflow_api(workflow).next_task
self.assertEqual('Activity_GetData', first_task.name)
workflow_api = self.get_workflow_api(workflow)
result = self.complete_form(workflow_api, first_task, {'formdata': 'asdf'})
self.assertIn('formdata', result.next_task.data)
self.assertEqual('asdf', result.next_task.data['formdata'])
workflow_api = self.get_workflow_api(workflow)
self.assertEqual('Activity_HowMany', self.get_workflow_api(workflow_api).next_task.name)
# restart with data. should land at beginning with data
workflow_api = self.restart_workflow_api(result)
first_task = self.get_workflow_api(workflow_api).next_task
self.assertEqual('Activity_GetData', first_task.name)
self.assertIn('formdata', workflow_api.next_task.data)
self.assertEqual('asdf', workflow_api.next_task.data['formdata'])
# restart without data.
workflow_api = self.restart_workflow_api(workflow_api, clear_data=True)
first_task = self.get_workflow_api(workflow).next_task
self.assertEqual('Activity_GetData', first_task.name)
self.assertNotIn('formdata', workflow_api.next_task.data)
print('Nice Test')