From b35ab336f53bdcd013754c3bb9d5dfc5c6fc8ee8 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Wed, 3 Apr 2024 16:17:14 +0000 Subject: [PATCH] Omit completed subprocesses (#1327) * updates to avoid loading completed subprocesses when loading a processor w/ burnettk * bpmn unit test generator in controller test is passing w/ burnettk * most tests are passing again w/ burnettk * tests are all passing now w/ burnettk * added assertion to reset pi test to ensure task data does come back w/ burnettk * removed debug benchmark stuff w/ burnettk * pyl w/ burnettk --------- Co-authored-by: jasquat --- spiffworkflow-backend/poetry.lock | 2 +- .../data_migrations/version_4.py | 4 ++- .../helpers/benchmarking.py | 14 +++++++++ .../routes/process_models_controller.py | 4 ++- .../services/process_instance_processor.py | 31 ++++++++++++++----- .../integration/test_process_api.py | 3 +- .../unit/test_process_instance_migrator.py | 8 +++-- .../unit/test_process_instance_processor.py | 13 ++++---- 8 files changed, 59 insertions(+), 20 deletions(-) create mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/helpers/benchmarking.py diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 66f7df313..3245be31b 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -3031,7 +3031,7 @@ doc = ["sphinx", "sphinx_rtd_theme"] type = "git" url = "https://github.com/sartography/SpiffWorkflow" reference = "main" -resolved_reference = "86cb84d29cb25c1a5a407a702ef35cf7b469df6b" +resolved_reference = "c1150c68ae0eb342ed13b57544e1bb0749255cb5" [[package]] name = "spiffworkflow-connector-command" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py index 4add55e1e..797719c5e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py @@ -16,7 +16,9 @@ class Version4(DataMigrationBase): def run(cls, process_instance: ProcessInstanceModel) -> None: # return None try: - processor = ProcessInstanceProcessor(process_instance) + processor = ProcessInstanceProcessor( + process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True + ) bpmn_process_dict = processor.serialize() update_data_objects(bpmn_process_dict) ProcessInstanceProcessor.persist_bpmn_process_dict( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/helpers/benchmarking.py b/spiffworkflow-backend/src/spiffworkflow_backend/helpers/benchmarking.py new file mode 100644 index 000000000..c9ae40ee0 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/helpers/benchmarking.py @@ -0,0 +1,14 @@ +import time +from collections.abc import Generator +from contextlib import contextmanager + +from flask import current_app + + +@contextmanager +def benchmark(message: str) -> Generator: + """Benchmark method useful for debugging slow stuff.""" + t1 = time.perf_counter() + yield + t2 = time.perf_counter() + current_app.logger.debug(f"{message}, Time={t2 - t1}") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py index 67cfed72a..c261a1884 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_models_controller.py @@ -367,7 +367,9 @@ def process_model_test_generate(modified_process_model_identifier: str, body: di test_case_identifier = body.get("test_case_identifier", f"test_case_for_process_instance_{process_instance_id}") process_instance = _find_process_instance_by_id_or_raise(int(process_instance_id)) - processor = ProcessInstanceProcessor(process_instance, include_task_data_for_completed_tasks=True) + processor = ProcessInstanceProcessor( + process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True + ) process_instance_dict = processor.serialize() test_case_dict = ProcessModelTestGeneratorService.generate_test_from_process_instance_dict( process_instance_dict, test_case_identifier=str(test_case_identifier) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index f662a9292..e5bdaba09 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -421,6 +421,7 @@ class ProcessInstanceProcessor: process_id_to_run: str | None = None, additional_processing_identifier: str | None = None, include_task_data_for_completed_tasks: bool = False, + include_completed_subprocesses: bool = False, ) -> None: """Create a Workflow Processor based on the serialized information available in the process_instance model.""" self._script_engine = script_engine or self.__class__._default_script_engine @@ -430,6 +431,7 @@ class ProcessInstanceProcessor: process_instance_model=process_instance_model, process_id_to_run=process_id_to_run, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, + include_completed_subprocesses=include_completed_subprocesses, ) def setup_processor_with_process_instance( @@ -437,6 +439,7 @@ class ProcessInstanceProcessor: process_instance_model: ProcessInstanceModel, process_id_to_run: str | None = None, include_task_data_for_completed_tasks: bool = False, + include_completed_subprocesses: bool = False, ) -> None: tld = current_app.config["THREAD_LOCAL_DATA"] tld.process_instance_id = process_instance_model.id @@ -480,6 +483,8 @@ class ProcessInstanceProcessor: process_instance_model, bpmn_process_spec, subprocesses=subprocesses, + include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, + include_completed_subprocesses=include_completed_subprocesses, ) self.set_script_engine(self.bpmn_process_instance, self._script_engine) @@ -675,11 +680,11 @@ class ProcessInstanceProcessor: include_task_data_for_completed_tasks: bool = False, ) -> None: json_data_hashes = set() - states_to_not_rehydrate_data: list[str] = [] - if include_task_data_for_completed_tasks: - states_to_not_rehydrate_data = ["COMPLETED", "CANCELLED", "ERROR"] + states_to_exclude_from_rehydration: list[str] = [] + if not include_task_data_for_completed_tasks: + states_to_exclude_from_rehydration = ["COMPLETED", "CANCELLED", "ERROR"] for task in tasks: - if task.state not in states_to_not_rehydrate_data: + if task.state not in states_to_exclude_from_rehydration: json_data_hashes.add(task.json_data_hash) json_data_records = JsonDataModel.query.filter(JsonDataModel.hash.in_(json_data_hashes)).all() # type: ignore json_data_mappings = {} @@ -692,7 +697,7 @@ class ProcessInstanceProcessor: tasks_dict = spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess_guid]["tasks"] tasks_dict[task.guid] = task.properties_json task_data = {} - if task.state not in states_to_not_rehydrate_data: + if task.state not in states_to_exclude_from_rehydration: task_data = json_data_mappings[task.json_data_hash] tasks_dict[task.guid]["data"] = task_data @@ -702,6 +707,7 @@ class ProcessInstanceProcessor: process_instance_model: ProcessInstanceModel, bpmn_definition_to_task_definitions_mappings: dict, include_task_data_for_completed_tasks: bool = False, + include_completed_subprocesses: bool = False, ) -> dict: if process_instance_model.bpmn_process_definition_id is None: return {} @@ -767,7 +773,14 @@ class ProcessInstanceProcessor: ) spiff_bpmn_process_dict.update(single_bpmn_process_dict) - bpmn_subprocesses = BpmnProcessModel.query.filter_by(top_level_process_id=bpmn_process.id).all() + bpmn_subprocesses_query = BpmnProcessModel.query.filter_by(top_level_process_id=bpmn_process.id) + if not include_completed_subprocesses: + bpmn_subprocesses_query = bpmn_subprocesses_query.join( + TaskModel, TaskModel.guid == BpmnProcessModel.guid + ).filter( + TaskModel.state.not_in(["COMPLETED", "ERROR", "CANCELLED"]) # type: ignore + ) + bpmn_subprocesses = bpmn_subprocesses_query.all() bpmn_subprocess_id_to_guid_mappings = {} for bpmn_subprocess in bpmn_subprocesses: subprocess_identifier = bpmn_subprocess.bpmn_process_definition.bpmn_identifier @@ -820,6 +833,7 @@ class ProcessInstanceProcessor: spec: BpmnProcessSpec | None = None, subprocesses: IdToBpmnProcessSpecMapping | None = None, include_task_data_for_completed_tasks: bool = False, + include_completed_subprocesses: bool = False, ) -> tuple[BpmnWorkflow, dict, dict]: full_bpmn_process_dict = {} bpmn_definition_to_task_definitions_mappings: dict = {} @@ -833,6 +847,7 @@ class ProcessInstanceProcessor: full_bpmn_process_dict = ProcessInstanceProcessor._get_full_bpmn_process_dict( process_instance_model, bpmn_definition_to_task_definitions_mappings, + include_completed_subprocesses=include_completed_subprocesses, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, ) # FIXME: the from_dict entrypoint in spiff will one day do this copy instead @@ -1247,7 +1262,9 @@ class ProcessInstanceProcessor: ProcessInstanceTmpService.add_event_to_process_instance( process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid ) - processor = ProcessInstanceProcessor(process_instance, include_task_data_for_completed_tasks=True) + processor = ProcessInstanceProcessor( + process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True + ) deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid)) spiff_tasks = processor.bpmn_process_instance.get_tasks() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index ae0a954a7..84e4a657b 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2030,11 +2030,10 @@ class TestProcessApi(BaseTest): ) assert response.status_code == 400 assert process_instance.status == "error" - processor = ProcessInstanceProcessor(process_instance) + processor = ProcessInstanceProcessor(process_instance, include_task_data_for_completed_tasks=True) spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance) assert spiff_task is not None - # TODO: remove these 2 lines if we enable no task data on rehydration again from pr-661 assert spiff_task.state == TaskState.ERROR assert spiff_task.data == {"my_var": "THE VAR"} diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_migrator.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_migrator.py index 22a1f04aa..219481446 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_migrator.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_migrator.py @@ -110,7 +110,9 @@ class TestProcessInstanceMigrator(BaseTest): process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() # ensure data was imported correctly and is in expected state - processor = ProcessInstanceProcessor(process_instance) + processor = ProcessInstanceProcessor( + process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True + ) bpmn_process_dict_version_3_after_import = processor.serialize() self.round_last_state_change(bpmn_process_dict_version_3) self.round_last_state_change(bpmn_process_dict_version_3_after_import) @@ -125,7 +127,9 @@ class TestProcessInstanceMigrator(BaseTest): Version4.run(process_instance) update_data_objects(bpmn_process_dict_version_4_from_spiff) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - processor = ProcessInstanceProcessor(process_instance) + processor = ProcessInstanceProcessor( + process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True + ) bpmn_process_dict_version_4 = processor.serialize() self.round_last_state_change(bpmn_process_dict_version_4) self.round_last_state_change(bpmn_process_dict_version_4_from_spiff) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 5eb8fd8c1..8eca4c8d8 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -209,7 +209,7 @@ class TestProcessInstanceProcessor(BaseTest): processor.do_engine_steps(save=True) # ensure this does not raise - processor = ProcessInstanceProcessor(process_instance) + processor = ProcessInstanceProcessor(process_instance, include_completed_subprocesses=True) # this task will be found within subprocesses spiff_task = processor.__class__.get_task_by_bpmn_identifier("level_3_script_task", processor.bpmn_process_instance) @@ -330,8 +330,6 @@ class TestProcessInstanceProcessor(BaseTest): task_model_to_reset_to = all_task_models_matching_top_level_subprocess_script[0] assert task_model_to_reset_to is not None assert len(process_instance.human_tasks) == 3, "expected 3 human tasks before reset" - processor = ProcessInstanceProcessor(process_instance) - processor = ProcessInstanceProcessor(process_instance) ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid) assert len(process_instance.human_tasks) == 2, "still expected 2 human tasks after reset" @@ -348,6 +346,8 @@ class TestProcessInstanceProcessor(BaseTest): task for task in ready_or_waiting_tasks if task.task_spec.name == "top_level_subprocess_script" ) assert top_level_subprocess_script_spiff_task is not None + # make sure we did not remove the data during the reset which can happen if include_task_data_for_completed_tasks is False + assert top_level_subprocess_script_spiff_task.data == {"set_in_top_level_script": 1} processor.resume() assert ( len(process_instance.human_tasks) == 2 @@ -412,7 +412,6 @@ class TestProcessInstanceProcessor(BaseTest): "manual_task_1", processor.bpmn_process_instance ) processor.suspend() - processor = ProcessInstanceProcessor(process_instance) ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id)) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() human_task_one = process_instance.active_human_tasks[0] @@ -527,9 +526,11 @@ class TestProcessInstanceProcessor(BaseTest): # recreate variables to ensure all bpmn json was recreated from scratch from the db process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() - processor_final = ProcessInstanceProcessor(process_instance_relookup) - processor_final.do_engine_steps(save=True, execution_strategy_name="greedy") + processor_last_tasks = ProcessInstanceProcessor(process_instance_relookup) + processor_last_tasks.do_engine_steps(save=True, execution_strategy_name="greedy") + process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + processor_final = ProcessInstanceProcessor(process_instance_relookup, include_completed_subprocesses=True) assert process_instance_relookup.status == "complete" data_set_1 = {"set_in_top_level_script": 1}