From 1db940116613001faeada34903243741b6c790f8 Mon Sep 17 00:00:00 2001 From: Dan Funk Date: Mon, 1 Jun 2020 17:42:28 -0400 Subject: [PATCH] Don't put all the data into Spiff Tasks on a reload or backtrack, just store the data that gets submitted each time in the task log, and use that. This should correct issues with parallel tasks and other complex areas - so we don't have tasks seeing data that isn't along their path. --- crc/api/workflow.py | 44 +++++++++++++++++++------- crc/models/api_models.py | 7 ++--- crc/models/stats.py | 1 + crc/services/workflow_processor.py | 2 +- crc/services/workflow_service.py | 9 +++--- migrations/versions/3876e130664e_.py | 28 +++++++++++++++++ tests/test_tasks_api.py | 4 ++- tests/test_workflow_processor.py | 47 ---------------------------- 8 files changed, 73 insertions(+), 69 deletions(-) create mode 100644 migrations/versions/3876e130664e_.py diff --git a/crc/api/workflow.py b/crc/api/workflow.py index 81252056..46befa20 100644 --- a/crc/api/workflow.py +++ b/crc/api/workflow.py @@ -1,5 +1,7 @@ import uuid +from SpiffWorkflow.util.deep_merge import DeepMerge + from crc import session from crc.api.common import ApiError, ApiErrorSchema from crc.models.api_models import WorkflowApi, WorkflowApiSchema, NavigationItem, NavigationItemSchema @@ -132,12 +134,19 @@ def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None): total_tasks=processor.workflow_model.total_tasks, completed_tasks=processor.workflow_model.completed_tasks, last_updated=processor.workflow_model.last_updated, - title=spec.display_name ) if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks. # This may or may not work, sometimes there is no next task to complete. next_task = processor.next_task() if next_task: + latest_event = session.query(TaskEventModel) \ + .filter_by(workflow_id=processor.workflow_model.id) \ + .filter_by(task_name=next_task.task_spec.name) \ + .filter_by(action=WorkflowService.TASK_ACTION_COMPLETE) \ + .order_by(TaskEventModel.date.desc()).first() + if latest_event: + next_task.data = DeepMerge.merge(next_task.data, latest_event.task_data) + workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True) return workflow_api @@ -158,17 +167,22 @@ def set_current_task(workflow_id, task_id): workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first() processor = WorkflowProcessor(workflow_model) task_id = uuid.UUID(task_id) - task = processor.bpmn_workflow.get_task(task_id) - if task.state != task.COMPLETED and task.state != task.READY: + spiff_task = processor.bpmn_workflow.get_task(task_id) + if spiff_task.state != spiff_task.COMPLETED and spiff_task.state != spiff_task.READY: raise ApiError("invalid_state", "You may not move the token to a task who's state is not " "currently set to COMPLETE or READY.") # Only reset the token if the task doesn't already have it. - if task.state == task.COMPLETED: - task.reset_token(reset_data=False) # we could optionally clear the previous data. + if spiff_task.state == spiff_task.COMPLETED: + spiff_task.reset_token(reset_data=True) # Don't try to copy the existing data back into this task. + processor.save() - WorkflowService.log_task_action(processor, task, WorkflowService.TASK_ACTION_TOKEN_RESET) - workflow_api_model = __get_workflow_api_model(processor, task) + task_api = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=True) + WorkflowService.log_task_action(workflow_model, task_api, + WorkflowService.TASK_ACTION_TOKEN_RESET, + version = processor.get_version_string()) + + workflow_api_model = __get_workflow_api_model(processor, spiff_task) return WorkflowApiSchema().dump(workflow_api_model) @@ -176,15 +190,21 @@ def update_task(workflow_id, task_id, body): workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first() processor = WorkflowProcessor(workflow_model) task_id = uuid.UUID(task_id) - task = processor.bpmn_workflow.get_task(task_id) - if task.state != task.READY: + spiff_task = processor.bpmn_workflow.get_task(task_id) + if spiff_task.state != spiff_task.READY: raise ApiError("invalid_state", "You may not update a task unless it is in the READY state. " "Consider calling a token reset to make this task Ready.") - task.update_data(body) - processor.complete_task(task) + spiff_task.update_data(body) + processor.complete_task(spiff_task) processor.do_engine_steps() processor.save() - WorkflowService.log_task_action(processor, task, WorkflowService.TASK_ACTION_COMPLETE) + + task_api = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=True) + WorkflowService.log_task_action(workflow_model, + task_api, + WorkflowService.TASK_ACTION_COMPLETE, + version = processor.get_version_string(), + updated_data = spiff_task.data) workflow_api_model = __get_workflow_api_model(processor) return WorkflowApiSchema().dump(workflow_api_model) diff --git a/crc/models/api_models.py b/crc/models/api_models.py index b8b535a7..eee6d5f5 100644 --- a/crc/models/api_models.py +++ b/crc/models/api_models.py @@ -119,7 +119,7 @@ class NavigationItemSchema(ma.Schema): class WorkflowApi(object): def __init__(self, id, status, next_task, navigation, - spec_version, is_latest_spec, workflow_spec_id, total_tasks, completed_tasks, last_updated, title): + spec_version, is_latest_spec, workflow_spec_id, total_tasks, completed_tasks, last_updated): self.id = id self.status = status self.next_task = next_task # The next task that requires user input. @@ -130,14 +130,13 @@ class WorkflowApi(object): self.total_tasks = total_tasks self.completed_tasks = completed_tasks self.last_updated = last_updated - self.title = title class WorkflowApiSchema(ma.Schema): class Meta: model = WorkflowApi fields = ["id", "status", "next_task", "navigation", "workflow_spec_id", "spec_version", "is_latest_spec", "total_tasks", "completed_tasks", - "last_updated", "title"] + "last_updated"] unknown = INCLUDE status = EnumField(WorkflowStatus) @@ -148,7 +147,7 @@ class WorkflowApiSchema(ma.Schema): def make_workflow(self, data, **kwargs): keys = ['id', 'status', 'next_task', 'navigation', 'workflow_spec_id', 'spec_version', 'is_latest_spec', "total_tasks", "completed_tasks", - "last_updated", "title"] + "last_updated"] filtered_fields = {key: data[key] for key in keys} filtered_fields['next_task'] = TaskSchema().make_task(data['next_task']) return WorkflowApi(**filtered_fields) diff --git a/crc/models/stats.py b/crc/models/stats.py index c72df7d4..8912b1d1 100644 --- a/crc/models/stats.py +++ b/crc/models/stats.py @@ -17,6 +17,7 @@ class TaskEventModel(db.Model): task_title = db.Column(db.String) task_type = db.Column(db.String) task_state = db.Column(db.String) + task_data = db.Column(db.JSON) mi_type = db.Column(db.String) mi_count = db.Column(db.Integer) mi_index = db.Column(db.Integer) diff --git a/crc/services/workflow_processor.py b/crc/services/workflow_processor.py index 93590d94..e5cbe0a3 100644 --- a/crc/services/workflow_processor.py +++ b/crc/services/workflow_processor.py @@ -315,7 +315,7 @@ class WorkflowProcessor(object): # Reset the current workflow to the beginning - which we will consider to be the first task after the root # element. This feels a little sketchy, but I think it is safe to assume root will have one child. first_task = self.bpmn_workflow.task_tree.children[0] - first_task.reset_token(reset_data=False) + first_task.reset_token(reset_data=True) # Clear out the data. for task in new_bpmn_workflow.get_tasks(SpiffTask.READY): task.data = first_task.data new_bpmn_workflow.do_engine_steps() diff --git a/crc/services/workflow_service.py b/crc/services/workflow_service.py index 312dee3c..1b34bd56 100644 --- a/crc/services/workflow_service.py +++ b/crc/services/workflow_service.py @@ -9,6 +9,7 @@ from SpiffWorkflow.bpmn.specs.ScriptTask import ScriptTask from SpiffWorkflow.bpmn.specs.UserTask import UserTask from SpiffWorkflow.dmn.specs.BusinessRuleTask import BusinessRuleTask from SpiffWorkflow.specs import CancelTask, StartTask +from SpiffWorkflow.util.deep_merge import DeepMerge from flask import g from jinja2 import Template @@ -316,21 +317,21 @@ class WorkflowService(object): field.options.append({"id": d.value, "name": d.label}) @staticmethod - def log_task_action(processor, spiff_task, action): - task = WorkflowService.spiff_task_to_api_task(spiff_task) - workflow_model = processor.workflow_model + def log_task_action(workflow_model: WorkflowModel, task: Task, + action: string, version, updated_data=None): task_event = TaskEventModel( study_id=workflow_model.study_id, user_uid=g.user.uid, workflow_id=workflow_model.id, workflow_spec_id=workflow_model.workflow_spec_id, - spec_version=processor.get_version_string(), + spec_version=version, action=action, task_id=task.id, task_name=task.name, task_title=task.title, task_type=str(task.type), task_state=task.state, + task_data=updated_data, mi_type=task.multi_instance_type.value, # Some tasks have a repeat behavior. mi_count=task.multi_instance_count, # This is the number of times the task could repeat. mi_index=task.multi_instance_index, # And the index of the currently repeating task. diff --git a/migrations/versions/3876e130664e_.py b/migrations/versions/3876e130664e_.py new file mode 100644 index 00000000..31e7ce13 --- /dev/null +++ b/migrations/versions/3876e130664e_.py @@ -0,0 +1,28 @@ +"""empty message + +Revision ID: 3876e130664e +Revises: 5064b72284b7 +Create Date: 2020-06-01 15:39:53.937591 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '3876e130664e' +down_revision = '5064b72284b7' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('task_event', sa.Column('task_data', sa.JSON(), nullable=True)) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column('task_event', 'task_data') + # ### end Alembic commands ### diff --git a/tests/test_tasks_api.py b/tests/test_tasks_api.py index 67a644ef..41fd1a3b 100644 --- a/tests/test_tasks_api.py +++ b/tests/test_tasks_api.py @@ -10,7 +10,6 @@ from crc.models.api_models import WorkflowApiSchema, MultiInstanceType, TaskSche from crc.models.file import FileModelSchema from crc.models.stats import TaskEventModel from crc.models.workflow import WorkflowStatus -from crc.services.protocol_builder import ProtocolBuilderService from crc.services.workflow_service import WorkflowService @@ -79,6 +78,9 @@ class TestTasksApi(BaseTest): self.assertEquals(task_in.process_name, event.process_name) self.assertIsNotNone(event.date) + # Assure that the data provided occurs in the task data log. + for key in dict_data.keys(): + self.assertIn(key, event.task_data) workflow = WorkflowApiSchema().load(json_data) return workflow diff --git a/tests/test_workflow_processor.py b/tests/test_workflow_processor.py index b3f6c374..1f8beebf 100644 --- a/tests/test_workflow_processor.py +++ b/tests/test_workflow_processor.py @@ -270,53 +270,6 @@ class TestWorkflowProcessor(BaseTest): processor = self.get_processor(study, workflow_spec_model) self.assertTrue(processor.get_version_string().startswith('v2.1.1')) - def test_restart_workflow(self): - self.load_example_data() - study = session.query(StudyModel).first() - workflow_spec_model = self.load_test_spec("two_forms") - processor = self.get_processor(study, workflow_spec_model) - self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id) - task = processor.next_task() - task.data = {"key": "Value"} - processor.complete_task(task) - task_before_restart = processor.next_task() - processor.hard_reset() - task_after_restart = processor.next_task() - - self.assertNotEqual(task.get_name(), task_before_restart.get_name()) - self.assertEqual(task.get_name(), task_after_restart.get_name()) - self.assertEqual(task.data, task_after_restart.data) - - def test_soft_reset(self): - self.load_example_data() - - # Start the two_forms workflow, and enter some data in the first form. - study = session.query(StudyModel).first() - workflow_spec_model = self.load_test_spec("two_forms") - processor = self.get_processor(study, workflow_spec_model) - self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id) - task = processor.next_task() - task.data = {"color": "blue"} - processor.complete_task(task) - - # Modify the specification, with a minor text change. - file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_text_mod.bpmn') - self.replace_file("two_forms.bpmn", file_path) - - # Setting up another processor should not error out, but doesn't pick up the update. - processor.workflow_model.bpmn_workflow_json = processor.serialize() - processor2 = WorkflowProcessor(processor.workflow_model) - self.assertEqual("Step 1", processor2.bpmn_workflow.last_task.task_spec.description) - self.assertNotEqual("# This is some documentation I wanted to add.", - processor2.bpmn_workflow.last_task.task_spec.documentation) - - # You can do a soft update and get the right response. - processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True) - self.assertEqual("Step 1", processor3.bpmn_workflow.last_task.task_spec.description) - self.assertEqual("# This is some documentation I wanted to add.", - processor3.bpmn_workflow.last_task.task_spec.documentation) - - def test_hard_reset(self): self.load_example_data()