From c0e072a2e2af03a97e392962ae3e116246a4e577 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Mon, 1 Jul 2024 16:05:51 -0400 Subject: [PATCH] Mi large data debug bpm only (#1846) * some debug code to help track down slow processing times for mi tasks with larget data sets w/ burnettk * use mapping cache for task models and bpmn processes and some linting cleanup * some minor bugfixes w/ burnettk * assign the correct bpmn process to the given guid w/ burnettk * fixed data migration tests w/ burnettk * unit tests are passing w/ burnettk * integration tets are passing w/ burnettk * some cleanup while code reviewing w/ burnettk * some more cleanup w/ burnettk * pass new args to TaskService in data migration w/ burnettk * fixed broken test w/ burnettk * allow running acceptance tests without keycloak w/ burnettk --------- Co-authored-by: jasquat --- .../bin/local_development_environment_setup | 3 +- spiffworkflow-backend/bin/tests-par | 2 +- .../src/spiffworkflow_backend/constants.py | 6 +- .../data_migrations/data_migration_base.py | 5 + .../data_migrations/version_2.py | 6 +- .../data_migrations/version_4.py | 4 +- .../data_migrations/version_5.py | 2 + .../services/process_instance_processor.py | 85 ++++++- .../services/task_service.py | 46 ++-- .../services/workflow_execution_service.py | 9 + ...nce_with_subprocess_and_large_dataset.bpmn | 222 ++++++++++++++++++ .../helpers/base_test.py | 10 +- .../test_process_models_controller.py | 1 + .../unit/test_process_instance_processor.py | 24 ++ 14 files changed, 389 insertions(+), 36 deletions(-) create mode 100644 spiffworkflow-backend/tests/data/multiinstance_with_subprocess_and_large_dataset/multiinstance_with_subprocess_and_large_dataset.bpmn diff --git a/spiffworkflow-backend/bin/local_development_environment_setup b/spiffworkflow-backend/bin/local_development_environment_setup index 5fd3acca..5feee70c 100755 --- a/spiffworkflow-backend/bin/local_development_environment_setup +++ b/spiffworkflow-backend/bin/local_development_environment_setup @@ -45,7 +45,8 @@ fi if [[ "$acceptance_test_mode" == "true" ]]; then export SPIFFWORKFLOW_BACKEND_LOAD_FIXTURE_DATA=true export SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME=acceptance_tests.yml -elif [[ "$use_local_open_id" == "true" ]]; then +fi +if [[ "$use_local_open_id" == "true" ]]; then backend_base_url="${SPIFFWORKFLOW_BACKEND_URL:-}" if [[ -z "$backend_base_url" ]]; then backend_base_url="http://localhost:$PORT" diff --git a/spiffworkflow-backend/bin/tests-par b/spiffworkflow-backend/bin/tests-par index 52f3200b..2722d667 100755 --- a/spiffworkflow-backend/bin/tests-par +++ b/spiffworkflow-backend/bin/tests-par @@ -18,4 +18,4 @@ if ! poetry run python -c "import xdist" &>/dev/null; then exit 1 fi -SPIFFWORKFLOW_BACKEND_DATABASE_TYPE=sqlite poet test -n auto -x --random-order +SPIFFWORKFLOW_BACKEND_DATABASE_TYPE=sqlite poet test -n auto -x --random-order "$@" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/constants.py b/spiffworkflow-backend/src/spiffworkflow_backend/constants.py index dd182caa..beec94d6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/constants.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/constants.py @@ -3,8 +3,8 @@ SPIFFWORKFLOW_BACKEND_SERIALIZER_VERSION = "5" # so we can tell if a migration has changed or if there is a new one SPIFFWORKFLOW_BACKEND_DATA_MIGRATION_CHECKSUM = { "version_1_3.py": "22636f5ffb8e6d56fa460d82f62a854c", - "version_2.py": "962b4fda4d466758bdbdc09d75099603", + "version_2.py": "d066710c58985e1db293931b01b00029", "version_3.py": "0e7154d0575c54b59011e3acedebe8b5", - "version_4.py": "889399d1c37e230a669d099f3a485fd4", - "version_5.py": "a5a9e62798b51741d4dd9d8f69868ded", + "version_4.py": "ed8a7a06d21fb3fbc7d1db5435c79058", + "version_5.py": "249d9a406098d15b7bf549b51d68c85e", } diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/data_migration_base.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/data_migration_base.py index 93d2b15a..534874a4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/data_migration_base.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/data_migration_base.py @@ -3,6 +3,7 @@ from __future__ import annotations import abc from typing import Any +from flask import current_app from spiffworkflow_backend.models.process_instance import ProcessInstanceModel @@ -33,3 +34,7 @@ class DataMigrationBase(metaclass=abc.ABCMeta): @abc.abstractmethod def run(cls, process_instance: ProcessInstanceModel) -> None: raise NotImplementedError("method must be implemented on subclass: run") + + @classmethod + def should_raise_on_error(cls) -> bool: + return current_app.config.get("ENV_IDENTIFIER") in ["unit_testing", "local_development"] diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py index 59571182..734bb283 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_2.py @@ -22,7 +22,11 @@ class Version2(DataMigrationBase): spiff_tasks = processor.bpmn_process_instance.get_tasks(updated_ts=initial_time) task_service = TaskService( - process_instance, processor._serializer, processor.bpmn_definition_to_task_definitions_mappings + process_instance, + processor._serializer, + processor.bpmn_definition_to_task_definitions_mappings, + task_model_mapping=processor.task_model_mapping, + bpmn_subprocess_mapping=processor.bpmn_subprocess_mapping, ) # implicit begin db transaction diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py index 65f75d94..f7d6757a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_4.py @@ -13,7 +13,7 @@ class Version4(DataMigrationBase): return "4" @classmethod - def run(cls, process_instance: ProcessInstanceModel) -> None: + def run(cls, process_instance: ProcessInstanceModel, should_raise_on_error: bool = False) -> None: try: processor = ProcessInstanceProcessor( process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True @@ -28,6 +28,8 @@ class Version4(DataMigrationBase): ) except Exception as ex: + if cls.should_raise_on_error(): + raise ex current_app.logger.warning(f"Failed to migrate process_instance '{process_instance.id}'. The error was {str(ex)}") @classmethod diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_5.py b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_5.py index 93249a8c..8ccfdd77 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_5.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/data_migrations/version_5.py @@ -35,4 +35,6 @@ class Version5(DataMigrationBase): task.properties_json = new_properties_json db.session.add(task) except Exception as ex: + if cls.should_raise_on_error(): + raise ex current_app.logger.warning(f"Failed to migrate process_instance '{process_instance.id}'. The error was {str(ex)}") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 3714c28a..9e8c6a83 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -463,6 +463,12 @@ class ProcessInstanceProcessor: bpmn_process_spec = None self.full_bpmn_process_dict: dict = {} + # mappings of tasks and bpmn subprocesses to the model objects so we can avoid unnecessary queries in the TaskService. + # only subprocesses should be necessary since the top-level process is on the process-instance and sqlalchemy + # should help us cache it in memeory. it also does not have a guid which is why just avoid caching it in this system. + self.task_model_mapping: dict[str, TaskModel] = {} + self.bpmn_subprocess_mapping: dict[str, BpmnProcessModel] = {} + # this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id # in the database. This is to cut down on database queries while adding new tasks to the database. # Structure: @@ -491,12 +497,14 @@ class ProcessInstanceProcessor: self.bpmn_process_instance, self.full_bpmn_process_dict, self.bpmn_definition_to_task_definitions_mappings, - ) = self.__get_bpmn_process_instance( + ) = self.__class__.__get_bpmn_process_instance( process_instance_model, - bpmn_process_spec, + spec=bpmn_process_spec, subprocesses=subprocesses, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, include_completed_subprocesses=include_completed_subprocesses, + task_model_mapping=self.task_model_mapping, + bpmn_subprocess_mapping=self.bpmn_subprocess_mapping, ) self.set_script_engine(self.bpmn_process_instance, self._script_engine) @@ -523,16 +531,18 @@ class ProcessInstanceProcessor: process_instance_model=process_instance_model, force_update=True, ) + bpmn_process_instance = cls.initialize_bpmn_process_instance(bpmn_process_dict) + task_model_mapping, bpmn_subprocess_mapping = cls.get_db_mappings_from_bpmn_process_dict(bpmn_process_dict) + task_service = TaskService( process_instance=process_instance_model, serializer=cls._serializer, bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, force_update_definitions=True, + task_model_mapping=task_model_mapping, + bpmn_subprocess_mapping=bpmn_subprocess_mapping, ) - process_copy = copy.deepcopy(bpmn_process_dict) - bpmn_process_instance = cls._serializer.from_dict(process_copy) - bpmn_process_instance.script_engine = cls._default_script_engine for spiff_task in bpmn_process_instance.get_tasks(): start_and_end_times: StartAndEndTimes | None = None if spiff_task.has_state(TaskState.COMPLETED | TaskState.ERROR): @@ -546,6 +556,29 @@ class ProcessInstanceProcessor: task_service.save_objects_to_database() db.session.commit() + @classmethod + def initialize_bpmn_process_instance(cls, bpmn_process_dict: dict) -> BpmnWorkflow: + process_copy = copy.deepcopy(bpmn_process_dict) + bpmn_process_instance = cls._serializer.from_dict(process_copy) + bpmn_process_instance.script_engine = cls._default_script_engine + return bpmn_process_instance + + @classmethod + def get_db_mappings_from_bpmn_process_dict( + cls, bpmn_process_dict: dict + ) -> tuple[dict[str, TaskModel], dict[str, BpmnProcessModel]]: + task_guids = set(bpmn_process_dict["tasks"].keys()) + bpmn_process_guids = set() + for subproc_guid, subproc_dict in bpmn_process_dict["subprocesses"].items(): + new_set = set(subproc_dict["tasks"].keys()) + task_guids.union(new_set) + bpmn_process_guids.add(subproc_guid) + task_models = TaskModel.query.filter(TaskModel.guid.in_(task_guids)).all() # type: ignore + bpmn_process_models = BpmnProcessModel.query.filter(BpmnProcessModel.guid.in_(bpmn_process_guids)).all() # type: ignore + task_model_mapping = {t.guid: t for t in task_models} + bpmn_subprocess_mapping = {b.guid: b for b in bpmn_process_models} + return (task_model_mapping, bpmn_subprocess_mapping) + @classmethod def get_process_model_and_subprocesses( cls, @@ -673,6 +706,7 @@ class ProcessInstanceProcessor: def _get_bpmn_process_dict( cls, bpmn_process: BpmnProcessModel, + task_model_mapping: dict[str, TaskModel], get_tasks: bool = False, include_task_data_for_completed_tasks: bool = False, ) -> dict: @@ -682,7 +716,10 @@ class ProcessInstanceProcessor: if get_tasks: tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all() cls._get_tasks_dict( - tasks, bpmn_process_dict, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks + tasks, + bpmn_process_dict, + include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, + task_model_mapping=task_model_mapping, ) return bpmn_process_dict @@ -691,6 +728,7 @@ class ProcessInstanceProcessor: cls, tasks: list[TaskModel], spiff_bpmn_process_dict: dict, + task_model_mapping: dict[str, TaskModel], bpmn_subprocess_id_to_guid_mappings: dict | None = None, include_task_data_for_completed_tasks: bool = False, ) -> None: @@ -739,12 +777,15 @@ class ProcessInstanceProcessor: if task.guid in task_guids_to_add: task_data = json_data_mappings[task.json_data_hash] tasks_dict[task.guid]["data"] = task_data + task_model_mapping[task.guid] = task @classmethod def _get_full_bpmn_process_dict( cls, process_instance_model: ProcessInstanceModel, bpmn_definition_to_task_definitions_mappings: dict, + task_model_mapping: dict[str, TaskModel], + bpmn_subprocess_mapping: dict[str, BpmnProcessModel], include_task_data_for_completed_tasks: bool = False, include_completed_subprocesses: bool = False, ) -> dict: @@ -772,7 +813,10 @@ class ProcessInstanceProcessor: bpmn_process = process_instance_model.bpmn_process if bpmn_process is not None: single_bpmn_process_dict = cls._get_bpmn_process_dict( - bpmn_process, get_tasks=True, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks + bpmn_process, + get_tasks=True, + include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, + task_model_mapping=task_model_mapping, ) spiff_bpmn_process_dict.update(single_bpmn_process_dict) @@ -791,8 +835,9 @@ class ProcessInstanceProcessor: current_app.logger.info(f"Deferring subprocess spec: '{subprocess_identifier}'") continue bpmn_subprocess_id_to_guid_mappings[bpmn_subprocess.id] = bpmn_subprocess.guid - single_bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess) + single_bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess, task_model_mapping=task_model_mapping) spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess.guid] = single_bpmn_process_dict + bpmn_subprocess_mapping[bpmn_subprocess.guid] = bpmn_subprocess tasks = TaskModel.query.filter( TaskModel.bpmn_process_id.in_(bpmn_subprocess_id_to_guid_mappings.keys()) # type: ignore @@ -800,8 +845,9 @@ class ProcessInstanceProcessor: cls._get_tasks_dict( tasks, spiff_bpmn_process_dict, - bpmn_subprocess_id_to_guid_mappings, + bpmn_subprocess_id_to_guid_mappings=bpmn_subprocess_id_to_guid_mappings, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, + task_model_mapping=task_model_mapping, ) return spiff_bpmn_process_dict @@ -833,6 +879,8 @@ class ProcessInstanceProcessor: @staticmethod def __get_bpmn_process_instance( process_instance_model: ProcessInstanceModel, + task_model_mapping: dict[str, TaskModel], + bpmn_subprocess_mapping: dict[str, BpmnProcessModel], spec: BpmnProcessSpec | None = None, subprocesses: IdToBpmnProcessSpecMapping | None = None, include_task_data_for_completed_tasks: bool = False, @@ -852,6 +900,8 @@ class ProcessInstanceProcessor: bpmn_definition_to_task_definitions_mappings, include_completed_subprocesses=include_completed_subprocesses, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks, + task_model_mapping=task_model_mapping, + bpmn_subprocess_mapping=bpmn_subprocess_mapping, ) # FIXME: the from_dict entrypoint in spiff will one day do this copy instead process_copy = copy.deepcopy(full_bpmn_process_dict) @@ -1244,6 +1294,8 @@ class ProcessInstanceProcessor: process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + bpmn_subprocess_mapping=self.bpmn_subprocess_mapping, + task_model_mapping=self.task_model_mapping, ) task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, [], start_time) ProcessInstanceTmpService.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id) @@ -1267,10 +1319,18 @@ class ProcessInstanceProcessor: deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid)) spiff_tasks = processor.bpmn_process_instance.get_tasks() + for dt in deleted_tasks: + if str(dt.id) in processor.task_model_mapping: + del processor.task_model_mapping[str(dt.id)] + if str(dt.id) in processor.bpmn_subprocess_mapping: + del processor.bpmn_subprocess_mapping[str(dt.id)] + task_service = TaskService( process_instance=processor.process_instance_model, serializer=processor._serializer, bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings, + task_model_mapping=processor.task_model_mapping, + bpmn_subprocess_mapping=processor.bpmn_subprocess_mapping, ) task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time, to_task_guid=to_task_guid) @@ -1485,6 +1545,8 @@ class ProcessInstanceProcessor: serializer=self._serializer, process_instance=self.process_instance_model, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + bpmn_subprocess_mapping=self.bpmn_subprocess_mapping, + task_model_mapping=self.task_model_mapping, ) if execution_strategy is None: @@ -1508,6 +1570,7 @@ class ProcessInstanceProcessor: save, should_schedule_waiting_timer_events=should_schedule_waiting_timer_events, ) + self.task_model_mapping, self.bpmn_subprocess_mapping = task_model_delegate.get_guid_to_db_object_mappings() self.check_all_tasks() return task_runnability @@ -1684,6 +1747,8 @@ class ProcessInstanceProcessor: serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, run_started_at=run_started_at, + bpmn_subprocess_mapping=self.bpmn_subprocess_mapping, + task_model_mapping=self.task_model_mapping, ) task_service.update_task_model(task_model, spiff_task) JsonDataModel.insert_or_update_json_data_records(task_service.json_data_dicts) @@ -1799,6 +1864,8 @@ class ProcessInstanceProcessor: process_instance=self.process_instance_model, serializer=self._serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + bpmn_subprocess_mapping=self.bpmn_subprocess_mapping, + task_model_mapping=self.task_model_mapping, ) task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 5d3a887d..41a30457 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -111,10 +111,20 @@ class TaskService: bpmn_definition_to_task_definitions_mappings: dict, run_started_at: float | None = None, force_update_definitions: bool = False, + task_model_mapping: dict[str, TaskModel] | None = None, + bpmn_subprocess_mapping: dict[str, BpmnProcessModel] | None = None, ) -> None: self.process_instance = process_instance self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings self.serializer = serializer + self.task_model_mapping = task_model_mapping or {} + self.bpmn_subprocess_mapping = bpmn_subprocess_mapping or {} + + self.bpmn_subprocess_id_mapping: dict[int, BpmnProcessModel] = {} + for _, bs in self.bpmn_subprocess_mapping.items(): + self.bpmn_subprocess_id_mapping[bs.id] = bs + if self.process_instance.bpmn_process_id is not None: + self.bpmn_subprocess_id_mapping[self.process_instance.bpmn_process_id] = self.process_instance.bpmn_process # this updates the definition ids for both tasks and bpmn_processes when they are updated # in case definitions were changed for the same instances. @@ -200,9 +210,7 @@ class TaskService: ) # we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value - bpmn_process = ( - new_bpmn_process or task_model.bpmn_process or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first() - ) + bpmn_process = new_bpmn_process or task_model.bpmn_process or self.bpmn_subprocess_id_mapping[task_model.bpmn_process_id] self.update_task_model(task_model, spiff_task) bpmn_process_json_data = self.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_instance=spiff_task.workflow) @@ -263,8 +271,8 @@ class TaskService: self.bpmn_processes[bpmn_process.guid or "top_level"] = bpmn_process - if spiff_workflow.parent_task_id: - direct_parent_bpmn_process = BpmnProcessModel.query.filter_by(id=bpmn_process.direct_parent_process_id).first() + if spiff_workflow.parent_task_id and bpmn_process.direct_parent_process_id: + direct_parent_bpmn_process = self.bpmn_subprocess_id_mapping[bpmn_process.direct_parent_process_id] self.update_bpmn_process(spiff_workflow.parent_workflow, direct_parent_bpmn_process) if self.force_update_definitions is True: @@ -316,9 +324,7 @@ class TaskService: task_model: TaskModel | None = TaskModel.query.filter_by(guid=spiff_task_guid).first() bpmn_process = None if task_model is None: - bpmn_process = self.task_bpmn_process( - spiff_task, - ) + bpmn_process = self.task_bpmn_process(spiff_task) task_definition = self.bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][ spiff_task.task_spec.name ] @@ -335,9 +341,9 @@ class TaskService: self, spiff_task: SpiffTask, ) -> BpmnProcessModel: - subprocess_guid, subprocess = self.__class__._task_subprocess(spiff_task) + subprocess_guid, spiff_subprocess = self.__class__._task_subprocess(spiff_task) bpmn_process: BpmnProcessModel | None = None - if subprocess is None: + if spiff_subprocess is None: bpmn_process = self.process_instance.bpmn_process # This is the top level workflow, which has no guid # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None @@ -348,11 +354,13 @@ class TaskService: spiff_workflow=spiff_workflow, ) else: - bpmn_process = BpmnProcessModel.query.filter_by(guid=subprocess_guid).first() + bpmn_process = None + if subprocess_guid is not None: + bpmn_process = self.bpmn_subprocess_mapping.get(subprocess_guid) if bpmn_process is None: spiff_workflow = spiff_task.workflow bpmn_process = self.add_bpmn_process( - bpmn_process_dict=self.serializer.to_dict(subprocess), + bpmn_process_dict=self.serializer.to_dict(spiff_subprocess), top_level_process=self.process_instance.bpmn_process, bpmn_process_guid=subprocess_guid, spiff_workflow=spiff_workflow, @@ -382,10 +390,8 @@ class TaskService: bpmn_process_dict.pop("subprocess_specs") bpmn_process = None - if top_level_process is not None: - bpmn_process = BpmnProcessModel.query.filter_by( - top_level_process_id=top_level_process.id, guid=bpmn_process_guid - ).first() + if top_level_process is not None and bpmn_process_guid is not None: + bpmn_process = self.bpmn_subprocess_mapping.get(bpmn_process_guid) elif self.process_instance.bpmn_process_id is not None: bpmn_process = self.process_instance.bpmn_process @@ -401,7 +407,7 @@ class TaskService: if top_level_process is not None: subprocesses = spiff_workflow.top_workflow.subprocesses - direct_bpmn_process_parent = top_level_process + direct_bpmn_process_parent: BpmnProcessModel | None = top_level_process # BpmnWorkflows do not know their own guid so we have to cycle through subprocesses to find the guid that matches # calling list(subprocesses) to make a copy of the keys so we can change subprocesses while iterating @@ -410,7 +416,7 @@ class TaskService: for subprocess_guid in list(subprocesses): subprocess = subprocesses[subprocess_guid] if subprocess == spiff_workflow.parent_workflow: - direct_bpmn_process_parent = BpmnProcessModel.query.filter_by(guid=str(subprocess_guid)).first() + direct_bpmn_process_parent = self.bpmn_subprocess_mapping.get(str(subprocess_guid)) if direct_bpmn_process_parent is None: raise BpmnProcessNotFoundError( f"Could not find bpmn process with guid: {str(subprocess_guid)} " @@ -445,6 +451,10 @@ class TaskService: spiff_workflow=spiff_workflow, bpmn_process=bpmn_process, ) + if bpmn_process.guid is not None: + self.bpmn_subprocess_mapping[bpmn_process.guid] = bpmn_process + self.bpmn_subprocess_id_mapping[bpmn_process.id] = bpmn_process + return bpmn_process def add_tasks_to_bpmn_process( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 8ec04c03..daf0f369 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -29,12 +29,14 @@ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_t from spiffworkflow_backend.data_stores.kkv import KKVDataStore from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum +from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.future_task import FutureTaskModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import MessageInstanceCorrelationRuleModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.assertion_service import safe_assertion from spiffworkflow_backend.services.jinja_service import JinjaService @@ -249,6 +251,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): process_instance: ProcessInstanceModel, bpmn_definition_to_task_definitions_mappings: dict, secondary_engine_step_delegate: EngineStepDelegate | None = None, + task_model_mapping: dict[str, TaskModel] | None = None, + bpmn_subprocess_mapping: dict[str, BpmnProcessModel] | None = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance @@ -266,6 +270,8 @@ class TaskModelSavingDelegate(EngineStepDelegate): serializer=self.serializer, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, run_started_at=time.time(), + task_model_mapping=task_model_mapping, + bpmn_subprocess_mapping=bpmn_subprocess_mapping, ) def will_complete_task(self, spiff_task: SpiffTask) -> None: @@ -334,6 +340,9 @@ class TaskModelSavingDelegate(EngineStepDelegate): def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: self.after_engine_steps(bpmn_process_instance) + def get_guid_to_db_object_mappings(self) -> tuple[dict[str, TaskModel], dict[str, BpmnProcessModel]]: + return (self.task_service.task_model_mapping, self.task_service.bpmn_subprocess_mapping) + def _should_update_task_model(self) -> bool: """No reason to save task model stuff if the process instance isn't persistent.""" return self.process_instance.persistence_level != "none" diff --git a/spiffworkflow-backend/tests/data/multiinstance_with_subprocess_and_large_dataset/multiinstance_with_subprocess_and_large_dataset.bpmn b/spiffworkflow-backend/tests/data/multiinstance_with_subprocess_and_large_dataset/multiinstance_with_subprocess_and_large_dataset.bpmn new file mode 100644 index 00000000..216e3505 --- /dev/null +++ b/spiffworkflow-backend/tests/data/multiinstance_with_subprocess_and_large_dataset/multiinstance_with_subprocess_and_large_dataset.bpmn @@ -0,0 +1,222 @@ + + + + + Flow_17db3yp + + + + Flow_0v0kh3k + + + + Flow_17db3yp + Flow_0wx95ot + input_collection_size_list = [5, 10, 20, 40, 60] #, 160, 320] + +loop_cnt = 0 +loop_max = len(input_collection_size_list) + + + Flow_1av87dx + Flow_1a06mr7 + input_collection = [] +for x in range(input_collection_size_list[loop_cnt]): + input_collection.append(x) + +del(x) + + + Flow_0wx95ot + Flow_0h1frjl + Flow_1av87dx + + + + Flow_09b6hjo + Flow_1ssduxn + + input_collection + output_collection + + + + + Flow_082jbuo + + + + Flow_02jeqnp + + + + Flow_082jbuo + Flow_02jeqnp + output_element = input_element + + + + + + Flow_11bwcp9 + Flow_0v0kh3k + Flow_17ph1cp + + + loop_cnt == loop_max + + + + Flow_1a06mr7 + Flow_09b6hjo + + + + Flow_1ssduxn + Flow_1ydr641 + + + + + Flow_1ydr641 + Flow_11bwcp9 + loop_cnt = loop_cnt + 1 + + + + Flow_17ph1cp + Flow_0h1frjl + +del(output_collection) +# del(output_element) + + + + input_collection + output_collection + + + + output_element = input_element + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py b/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py index 141fb159..734165d3 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/helpers/base_test.py @@ -306,19 +306,25 @@ class BaseTest: process_model: ProcessModelInfo, status: str | None = "not_started", user: UserModel | None = None, + save_start_and_end_times: bool = True, ) -> ProcessInstanceModel: if user is None: user = self.find_or_create_user() current_time = round(time.time()) + start_in_seconds = None + end_in_seconds = None + if save_start_and_end_times: + start_in_seconds = current_time - (3600 * 1) + end_in_seconds = current_time - (3600 * 1 - 20) process_instance = ProcessInstanceModel( status=status, process_initiator=user, process_model_identifier=process_model.id, process_model_display_name=process_model.display_name, updated_at_in_seconds=round(time.time()), - start_in_seconds=current_time - (3600 * 1), - end_in_seconds=current_time - (3600 * 1 - 20), + start_in_seconds=start_in_seconds, + end_in_seconds=end_in_seconds, ) db.session.add(process_instance) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_models_controller.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_models_controller.py index bb5f6995..f780a66a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_models_controller.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_models_controller.py @@ -133,6 +133,7 @@ class TestProcessModelsController(BaseTest): mock_post.return_value.ok = True mock_post.return_value.text = json.dumps(connector_response) processor.do_engine_steps(save=True) + self.complete_next_manual_task(processor, execution_mode="synchronous") self.complete_next_manual_task(processor, execution_mode="synchronous", data={"firstName": "Chuck"}) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 4a37802e..90aa1b4c 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -1076,3 +1076,27 @@ class TestProcessInstanceProcessor(BaseTest): assert process_instance.summary is not None # mypy thinks this is unreachable but it is reachable. summary can be str | None assert len(process_instance.summary) == 255 # type: ignore + + # To test processing times with multiinstance subprocesses + # def test_large_multiinstance( + # self, + # app: Flask, + # client: FlaskClient, + # with_db_and_bpmn_file_cleanup: None, + # ) -> None: + # process_model = load_test_spec( + # process_model_id="test_group/multiinstance_with_subprocess_and_large_dataset", + # process_model_source_directory="multiinstance_with_subprocess_and_large_dataset", + # ) + # process_instance = self.create_process_instance_from_process_model( + # process_model=process_model, save_start_and_end_times=False + # ) + # + # processor = ProcessInstanceProcessor(process_instance) + # # start_time = time.time() + # processor.do_engine_steps(save=True, execution_strategy_name="greedy") + # # end_time = time.time() + # # duration = end_time - start_time + # # assert processor.process_instance_model.end_in_seconds is not None + # # duration = processor.process_instance_model.end_in_seconds - processor.process_instance_model.created_at_in_seconds + # # print(f"➡️ ➡️ ➡️ duration: {duration}")