check task data on spiff tasks

This commit is contained in:
jasquat 2023-03-10 13:29:23 -05:00
parent 3fe098c6d9
commit e13e703825
2 changed files with 30 additions and 18 deletions

View File

@ -1061,16 +1061,9 @@ class ProcessInstanceProcessor:
# if self.process_instance_model.bpmn_process_definition_id is None: # if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict) self._add_bpmn_process_definitions(bpmn_spec_dict)
# FIXME: Update tasks in the did_complete_task instead to set the final info.
# We will need to somehow cache all tasks initially though before each task is run.
# Maybe always do this for first run - just need to know it's the first run.
# import pdb; pdb.set_trace()
# if self.process_instance_model.bpmn_process_id is None:
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model) bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model)
for subprocess_task_id, subprocess_properties in subprocesses.items(): for subprocess_task_id, subprocess_properties in subprocesses.items():
# import pdb; pdb.set_trace()
print(f"subprocess_task_id: {subprocess_task_id}")
TaskService.add_bpmn_process( TaskService.add_bpmn_process(
subprocess_properties, subprocess_properties,
self.process_instance_model, self.process_instance_model,

View File

@ -333,26 +333,45 @@ class TestProcessInstanceProcessor(BaseTest):
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
human_task_one = process_instance.active_human_tasks[0] human_task_one = process_instance.active_human_tasks[0]
spiff_task = processor.__class__.get_task_by_bpmn_identifier( spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier(
human_task_one.task_name, processor.bpmn_process_instance human_task_one.task_name, processor.bpmn_process_instance
) )
ProcessInstanceService.complete_form_task( ProcessInstanceService.complete_form_task(
processor, spiff_task, {}, initiator_user, human_task_one processor, spiff_manual_task, {}, initiator_user, human_task_one
) )
# recreate variables to ensure all bpmn json was recreated from scratch from the db
process_instance_relookup = ProcessInstanceModel.query.filter_by( process_instance_relookup = ProcessInstanceModel.query.filter_by(
id=process_instance.id id=process_instance.id
).first() ).first()
processor = ProcessInstanceProcessor(process_instance_relookup) processor_final = ProcessInstanceProcessor(process_instance_relookup)
assert process_instance_relookup.status == "complete" assert process_instance_relookup.status == "complete"
task = TaskModel.query.filter_by(guid=human_task_one.task_id).first()
assert task.state == "COMPLETED" first_data_set = {'set_in_top_level_script': 1}
end_event_spiff_task = processor.__class__.get_task_by_bpmn_identifier( second_data_set = {**first_data_set, **{'set_in_top_level_subprocess': 1}}
"end_event_of_manual_task_model", processor.bpmn_process_instance third_data_set = {**second_data_set, **{'set_in_test_process_to_call_script': 1}}
expected_task_data = {
"top_level_script": first_data_set,
"manual_task": first_data_set,
"top_level_subprocess_script": second_data_set,
"top_level_subprocess": second_data_set,
"test_process_to_call_script": third_data_set,
"top_level_call_activity": third_data_set,
"end_event_of_manual_task_model": third_data_set,
}
all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks()
assert len(all_spiff_tasks) > 1
for spiff_task in all_spiff_tasks:
assert spiff_task.state == TaskState.COMPLETED
spiff_task_name = spiff_task.task_spec.name
if spiff_task_name in expected_task_data:
spiff_task_data = expected_task_data[spiff_task_name]
failure_message = (
f"Found unexpected task data on {spiff_task_name}. "
f"Expected: {spiff_task_data}, Found: {spiff_task.data}"
) )
assert end_event_spiff_task assert spiff_task.data == spiff_task_data, failure_message
assert end_event_spiff_task.state == TaskState.COMPLETED
# # NOTE: also check the spiff task from the new processor
def test_does_not_recreate_human_tasks_on_multiple_saves( def test_does_not_recreate_human_tasks_on_multiple_saves(
self, self,