From e13e703825574de7167d3fee3a9ad45979feff84 Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 10 Mar 2023 13:29:23 -0500 Subject: [PATCH] check task data on spiff tasks --- .../services/process_instance_processor.py | 7 ---- .../unit/test_process_instance_processor.py | 41 ++++++++++++++----- 2 files changed, 30 insertions(+), 18 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index a68d7d09f..3b9c43aca 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1061,16 +1061,9 @@ class ProcessInstanceProcessor: # if self.process_instance_model.bpmn_process_definition_id is None: self._add_bpmn_process_definitions(bpmn_spec_dict) - # FIXME: Update tasks in the did_complete_task instead to set the final info. - # We will need to somehow cache all tasks initially though before each task is run. - # Maybe always do this for first run - just need to know it's the first run. - # import pdb; pdb.set_trace() - # if self.process_instance_model.bpmn_process_id is None: subprocesses = process_instance_data_dict.pop("subprocesses") bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model) for subprocess_task_id, subprocess_properties in subprocesses.items(): - # import pdb; pdb.set_trace() - print(f"subprocess_task_id: {subprocess_task_id}") TaskService.add_bpmn_process( subprocess_properties, self.process_instance_model, diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 63b4fa4f9..5e586fa08 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -333,26 +333,45 @@ class TestProcessInstanceProcessor(BaseTest): processor = ProcessInstanceProcessor(process_instance) human_task_one = process_instance.active_human_tasks[0] - spiff_task = processor.__class__.get_task_by_bpmn_identifier( + spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( human_task_one.task_name, processor.bpmn_process_instance ) ProcessInstanceService.complete_form_task( - processor, spiff_task, {}, initiator_user, human_task_one + processor, spiff_manual_task, {}, initiator_user, human_task_one ) + # recreate variables to ensure all bpmn json was recreated from scratch from the db process_instance_relookup = ProcessInstanceModel.query.filter_by( id=process_instance.id ).first() - processor = ProcessInstanceProcessor(process_instance_relookup) + processor_final = ProcessInstanceProcessor(process_instance_relookup) assert process_instance_relookup.status == "complete" - task = TaskModel.query.filter_by(guid=human_task_one.task_id).first() - assert task.state == "COMPLETED" - end_event_spiff_task = processor.__class__.get_task_by_bpmn_identifier( - "end_event_of_manual_task_model", processor.bpmn_process_instance - ) - assert end_event_spiff_task - assert end_event_spiff_task.state == TaskState.COMPLETED - # # NOTE: also check the spiff task from the new processor + + first_data_set = {'set_in_top_level_script': 1} + second_data_set = {**first_data_set, **{'set_in_top_level_subprocess': 1}} + third_data_set = {**second_data_set, **{'set_in_test_process_to_call_script': 1}} + expected_task_data = { + "top_level_script": first_data_set, + "manual_task": first_data_set, + "top_level_subprocess_script": second_data_set, + "top_level_subprocess": second_data_set, + "test_process_to_call_script": third_data_set, + "top_level_call_activity": third_data_set, + "end_event_of_manual_task_model": third_data_set, + } + + all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() + assert len(all_spiff_tasks) > 1 + for spiff_task in all_spiff_tasks: + assert spiff_task.state == TaskState.COMPLETED + spiff_task_name = spiff_task.task_spec.name + if spiff_task_name in expected_task_data: + spiff_task_data = expected_task_data[spiff_task_name] + failure_message = ( + f"Found unexpected task data on {spiff_task_name}. " + f"Expected: {spiff_task_data}, Found: {spiff_task.data}" + ) + assert spiff_task.data == spiff_task_data, failure_message def test_does_not_recreate_human_tasks_on_multiple_saves( self,