Mi large data debug bpm only (#1846)

* some debug code to help track down slow processing times for mi tasks with larget data sets w/ burnettk

* use mapping cache for task models and bpmn processes and some linting cleanup

* some minor bugfixes w/ burnettk

* assign the correct bpmn process to the given guid w/ burnettk

* fixed data migration tests w/ burnettk

* unit tests are passing w/ burnettk

* integration tets are passing w/ burnettk

* some cleanup while code reviewing w/ burnettk

* some more cleanup w/ burnettk

* pass new args to TaskService in data migration w/ burnettk

* fixed broken test w/ burnettk

* allow running acceptance tests without keycloak w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-07-01 16:05:51 -04:00 committed by GitHub
parent 6db4ab669f
commit c0e072a2e2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 389 additions and 36 deletions

View File

@ -45,7 +45,8 @@ fi
if [[ "$acceptance_test_mode" == "true" ]]; then
export SPIFFWORKFLOW_BACKEND_LOAD_FIXTURE_DATA=true
export SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME=acceptance_tests.yml
elif [[ "$use_local_open_id" == "true" ]]; then
fi
if [[ "$use_local_open_id" == "true" ]]; then
backend_base_url="${SPIFFWORKFLOW_BACKEND_URL:-}"
if [[ -z "$backend_base_url" ]]; then
backend_base_url="http://localhost:$PORT"

View File

@ -18,4 +18,4 @@ if ! poetry run python -c "import xdist" &>/dev/null; then
exit 1
fi
SPIFFWORKFLOW_BACKEND_DATABASE_TYPE=sqlite poet test -n auto -x --random-order
SPIFFWORKFLOW_BACKEND_DATABASE_TYPE=sqlite poet test -n auto -x --random-order "$@"

View File

@ -3,8 +3,8 @@ SPIFFWORKFLOW_BACKEND_SERIALIZER_VERSION = "5"
# so we can tell if a migration has changed or if there is a new one
SPIFFWORKFLOW_BACKEND_DATA_MIGRATION_CHECKSUM = {
"version_1_3.py": "22636f5ffb8e6d56fa460d82f62a854c",
"version_2.py": "962b4fda4d466758bdbdc09d75099603",
"version_2.py": "d066710c58985e1db293931b01b00029",
"version_3.py": "0e7154d0575c54b59011e3acedebe8b5",
"version_4.py": "889399d1c37e230a669d099f3a485fd4",
"version_5.py": "a5a9e62798b51741d4dd9d8f69868ded",
"version_4.py": "ed8a7a06d21fb3fbc7d1db5435c79058",
"version_5.py": "249d9a406098d15b7bf549b51d68c85e",
}

View File

@ -3,6 +3,7 @@ from __future__ import annotations
import abc
from typing import Any
from flask import current_app
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
@ -33,3 +34,7 @@ class DataMigrationBase(metaclass=abc.ABCMeta):
@abc.abstractmethod
def run(cls, process_instance: ProcessInstanceModel) -> None:
raise NotImplementedError("method must be implemented on subclass: run")
@classmethod
def should_raise_on_error(cls) -> bool:
return current_app.config.get("ENV_IDENTIFIER") in ["unit_testing", "local_development"]

View File

@ -22,7 +22,11 @@ class Version2(DataMigrationBase):
spiff_tasks = processor.bpmn_process_instance.get_tasks(updated_ts=initial_time)
task_service = TaskService(
process_instance, processor._serializer, processor.bpmn_definition_to_task_definitions_mappings
process_instance,
processor._serializer,
processor.bpmn_definition_to_task_definitions_mappings,
task_model_mapping=processor.task_model_mapping,
bpmn_subprocess_mapping=processor.bpmn_subprocess_mapping,
)
# implicit begin db transaction

View File

@ -13,7 +13,7 @@ class Version4(DataMigrationBase):
return "4"
@classmethod
def run(cls, process_instance: ProcessInstanceModel) -> None:
def run(cls, process_instance: ProcessInstanceModel, should_raise_on_error: bool = False) -> None:
try:
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True
@ -28,6 +28,8 @@ class Version4(DataMigrationBase):
)
except Exception as ex:
if cls.should_raise_on_error():
raise ex
current_app.logger.warning(f"Failed to migrate process_instance '{process_instance.id}'. The error was {str(ex)}")
@classmethod

View File

@ -35,4 +35,6 @@ class Version5(DataMigrationBase):
task.properties_json = new_properties_json
db.session.add(task)
except Exception as ex:
if cls.should_raise_on_error():
raise ex
current_app.logger.warning(f"Failed to migrate process_instance '{process_instance.id}'. The error was {str(ex)}")

View File

@ -463,6 +463,12 @@ class ProcessInstanceProcessor:
bpmn_process_spec = None
self.full_bpmn_process_dict: dict = {}
# mappings of tasks and bpmn subprocesses to the model objects so we can avoid unnecessary queries in the TaskService.
# only subprocesses should be necessary since the top-level process is on the process-instance and sqlalchemy
# should help us cache it in memeory. it also does not have a guid which is why just avoid caching it in this system.
self.task_model_mapping: dict[str, TaskModel] = {}
self.bpmn_subprocess_mapping: dict[str, BpmnProcessModel] = {}
# this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id
# in the database. This is to cut down on database queries while adding new tasks to the database.
# Structure:
@ -491,12 +497,14 @@ class ProcessInstanceProcessor:
self.bpmn_process_instance,
self.full_bpmn_process_dict,
self.bpmn_definition_to_task_definitions_mappings,
) = self.__get_bpmn_process_instance(
) = self.__class__.__get_bpmn_process_instance(
process_instance_model,
bpmn_process_spec,
spec=bpmn_process_spec,
subprocesses=subprocesses,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
include_completed_subprocesses=include_completed_subprocesses,
task_model_mapping=self.task_model_mapping,
bpmn_subprocess_mapping=self.bpmn_subprocess_mapping,
)
self.set_script_engine(self.bpmn_process_instance, self._script_engine)
@ -523,16 +531,18 @@ class ProcessInstanceProcessor:
process_instance_model=process_instance_model,
force_update=True,
)
bpmn_process_instance = cls.initialize_bpmn_process_instance(bpmn_process_dict)
task_model_mapping, bpmn_subprocess_mapping = cls.get_db_mappings_from_bpmn_process_dict(bpmn_process_dict)
task_service = TaskService(
process_instance=process_instance_model,
serializer=cls._serializer,
bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
force_update_definitions=True,
task_model_mapping=task_model_mapping,
bpmn_subprocess_mapping=bpmn_subprocess_mapping,
)
process_copy = copy.deepcopy(bpmn_process_dict)
bpmn_process_instance = cls._serializer.from_dict(process_copy)
bpmn_process_instance.script_engine = cls._default_script_engine
for spiff_task in bpmn_process_instance.get_tasks():
start_and_end_times: StartAndEndTimes | None = None
if spiff_task.has_state(TaskState.COMPLETED | TaskState.ERROR):
@ -546,6 +556,29 @@ class ProcessInstanceProcessor:
task_service.save_objects_to_database()
db.session.commit()
@classmethod
def initialize_bpmn_process_instance(cls, bpmn_process_dict: dict) -> BpmnWorkflow:
process_copy = copy.deepcopy(bpmn_process_dict)
bpmn_process_instance = cls._serializer.from_dict(process_copy)
bpmn_process_instance.script_engine = cls._default_script_engine
return bpmn_process_instance
@classmethod
def get_db_mappings_from_bpmn_process_dict(
cls, bpmn_process_dict: dict
) -> tuple[dict[str, TaskModel], dict[str, BpmnProcessModel]]:
task_guids = set(bpmn_process_dict["tasks"].keys())
bpmn_process_guids = set()
for subproc_guid, subproc_dict in bpmn_process_dict["subprocesses"].items():
new_set = set(subproc_dict["tasks"].keys())
task_guids.union(new_set)
bpmn_process_guids.add(subproc_guid)
task_models = TaskModel.query.filter(TaskModel.guid.in_(task_guids)).all() # type: ignore
bpmn_process_models = BpmnProcessModel.query.filter(BpmnProcessModel.guid.in_(bpmn_process_guids)).all() # type: ignore
task_model_mapping = {t.guid: t for t in task_models}
bpmn_subprocess_mapping = {b.guid: b for b in bpmn_process_models}
return (task_model_mapping, bpmn_subprocess_mapping)
@classmethod
def get_process_model_and_subprocesses(
cls,
@ -673,6 +706,7 @@ class ProcessInstanceProcessor:
def _get_bpmn_process_dict(
cls,
bpmn_process: BpmnProcessModel,
task_model_mapping: dict[str, TaskModel],
get_tasks: bool = False,
include_task_data_for_completed_tasks: bool = False,
) -> dict:
@ -682,7 +716,10 @@ class ProcessInstanceProcessor:
if get_tasks:
tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all()
cls._get_tasks_dict(
tasks, bpmn_process_dict, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks
tasks,
bpmn_process_dict,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
task_model_mapping=task_model_mapping,
)
return bpmn_process_dict
@ -691,6 +728,7 @@ class ProcessInstanceProcessor:
cls,
tasks: list[TaskModel],
spiff_bpmn_process_dict: dict,
task_model_mapping: dict[str, TaskModel],
bpmn_subprocess_id_to_guid_mappings: dict | None = None,
include_task_data_for_completed_tasks: bool = False,
) -> None:
@ -739,12 +777,15 @@ class ProcessInstanceProcessor:
if task.guid in task_guids_to_add:
task_data = json_data_mappings[task.json_data_hash]
tasks_dict[task.guid]["data"] = task_data
task_model_mapping[task.guid] = task
@classmethod
def _get_full_bpmn_process_dict(
cls,
process_instance_model: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
task_model_mapping: dict[str, TaskModel],
bpmn_subprocess_mapping: dict[str, BpmnProcessModel],
include_task_data_for_completed_tasks: bool = False,
include_completed_subprocesses: bool = False,
) -> dict:
@ -772,7 +813,10 @@ class ProcessInstanceProcessor:
bpmn_process = process_instance_model.bpmn_process
if bpmn_process is not None:
single_bpmn_process_dict = cls._get_bpmn_process_dict(
bpmn_process, get_tasks=True, include_task_data_for_completed_tasks=include_task_data_for_completed_tasks
bpmn_process,
get_tasks=True,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
task_model_mapping=task_model_mapping,
)
spiff_bpmn_process_dict.update(single_bpmn_process_dict)
@ -791,8 +835,9 @@ class ProcessInstanceProcessor:
current_app.logger.info(f"Deferring subprocess spec: '{subprocess_identifier}'")
continue
bpmn_subprocess_id_to_guid_mappings[bpmn_subprocess.id] = bpmn_subprocess.guid
single_bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess)
single_bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess, task_model_mapping=task_model_mapping)
spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess.guid] = single_bpmn_process_dict
bpmn_subprocess_mapping[bpmn_subprocess.guid] = bpmn_subprocess
tasks = TaskModel.query.filter(
TaskModel.bpmn_process_id.in_(bpmn_subprocess_id_to_guid_mappings.keys()) # type: ignore
@ -800,8 +845,9 @@ class ProcessInstanceProcessor:
cls._get_tasks_dict(
tasks,
spiff_bpmn_process_dict,
bpmn_subprocess_id_to_guid_mappings,
bpmn_subprocess_id_to_guid_mappings=bpmn_subprocess_id_to_guid_mappings,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
task_model_mapping=task_model_mapping,
)
return spiff_bpmn_process_dict
@ -833,6 +879,8 @@ class ProcessInstanceProcessor:
@staticmethod
def __get_bpmn_process_instance(
process_instance_model: ProcessInstanceModel,
task_model_mapping: dict[str, TaskModel],
bpmn_subprocess_mapping: dict[str, BpmnProcessModel],
spec: BpmnProcessSpec | None = None,
subprocesses: IdToBpmnProcessSpecMapping | None = None,
include_task_data_for_completed_tasks: bool = False,
@ -852,6 +900,8 @@ class ProcessInstanceProcessor:
bpmn_definition_to_task_definitions_mappings,
include_completed_subprocesses=include_completed_subprocesses,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
task_model_mapping=task_model_mapping,
bpmn_subprocess_mapping=bpmn_subprocess_mapping,
)
# FIXME: the from_dict entrypoint in spiff will one day do this copy instead
process_copy = copy.deepcopy(full_bpmn_process_dict)
@ -1244,6 +1294,8 @@ class ProcessInstanceProcessor:
process_instance=self.process_instance_model,
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
bpmn_subprocess_mapping=self.bpmn_subprocess_mapping,
task_model_mapping=self.task_model_mapping,
)
task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, [], start_time)
ProcessInstanceTmpService.add_event_to_process_instance(self.process_instance_model, event_type, task_guid=task_id)
@ -1267,10 +1319,18 @@ class ProcessInstanceProcessor:
deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid))
spiff_tasks = processor.bpmn_process_instance.get_tasks()
for dt in deleted_tasks:
if str(dt.id) in processor.task_model_mapping:
del processor.task_model_mapping[str(dt.id)]
if str(dt.id) in processor.bpmn_subprocess_mapping:
del processor.bpmn_subprocess_mapping[str(dt.id)]
task_service = TaskService(
process_instance=processor.process_instance_model,
serializer=processor._serializer,
bpmn_definition_to_task_definitions_mappings=processor.bpmn_definition_to_task_definitions_mappings,
task_model_mapping=processor.task_model_mapping,
bpmn_subprocess_mapping=processor.bpmn_subprocess_mapping,
)
task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time, to_task_guid=to_task_guid)
@ -1485,6 +1545,8 @@ class ProcessInstanceProcessor:
serializer=self._serializer,
process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
bpmn_subprocess_mapping=self.bpmn_subprocess_mapping,
task_model_mapping=self.task_model_mapping,
)
if execution_strategy is None:
@ -1508,6 +1570,7 @@ class ProcessInstanceProcessor:
save,
should_schedule_waiting_timer_events=should_schedule_waiting_timer_events,
)
self.task_model_mapping, self.bpmn_subprocess_mapping = task_model_delegate.get_guid_to_db_object_mappings()
self.check_all_tasks()
return task_runnability
@ -1684,6 +1747,8 @@ class ProcessInstanceProcessor:
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
run_started_at=run_started_at,
bpmn_subprocess_mapping=self.bpmn_subprocess_mapping,
task_model_mapping=self.task_model_mapping,
)
task_service.update_task_model(task_model, spiff_task)
JsonDataModel.insert_or_update_json_data_records(task_service.json_data_dicts)
@ -1799,6 +1864,8 @@ class ProcessInstanceProcessor:
process_instance=self.process_instance_model,
serializer=self._serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
bpmn_subprocess_mapping=self.bpmn_subprocess_mapping,
task_model_mapping=self.task_model_mapping,
)
task_service.update_all_tasks_from_spiff_tasks(spiff_tasks, deleted_tasks, start_time)

View File

@ -111,10 +111,20 @@ class TaskService:
bpmn_definition_to_task_definitions_mappings: dict,
run_started_at: float | None = None,
force_update_definitions: bool = False,
task_model_mapping: dict[str, TaskModel] | None = None,
bpmn_subprocess_mapping: dict[str, BpmnProcessModel] | None = None,
) -> None:
self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings
self.serializer = serializer
self.task_model_mapping = task_model_mapping or {}
self.bpmn_subprocess_mapping = bpmn_subprocess_mapping or {}
self.bpmn_subprocess_id_mapping: dict[int, BpmnProcessModel] = {}
for _, bs in self.bpmn_subprocess_mapping.items():
self.bpmn_subprocess_id_mapping[bs.id] = bs
if self.process_instance.bpmn_process_id is not None:
self.bpmn_subprocess_id_mapping[self.process_instance.bpmn_process_id] = self.process_instance.bpmn_process
# this updates the definition ids for both tasks and bpmn_processes when they are updated
# in case definitions were changed for the same instances.
@ -200,9 +210,7 @@ class TaskService:
)
# we are not sure why task_model.bpmn_process can be None while task_model.bpmn_process_id actually has a valid value
bpmn_process = (
new_bpmn_process or task_model.bpmn_process or BpmnProcessModel.query.filter_by(id=task_model.bpmn_process_id).first()
)
bpmn_process = new_bpmn_process or task_model.bpmn_process or self.bpmn_subprocess_id_mapping[task_model.bpmn_process_id]
self.update_task_model(task_model, spiff_task)
bpmn_process_json_data = self.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_instance=spiff_task.workflow)
@ -263,8 +271,8 @@ class TaskService:
self.bpmn_processes[bpmn_process.guid or "top_level"] = bpmn_process
if spiff_workflow.parent_task_id:
direct_parent_bpmn_process = BpmnProcessModel.query.filter_by(id=bpmn_process.direct_parent_process_id).first()
if spiff_workflow.parent_task_id and bpmn_process.direct_parent_process_id:
direct_parent_bpmn_process = self.bpmn_subprocess_id_mapping[bpmn_process.direct_parent_process_id]
self.update_bpmn_process(spiff_workflow.parent_workflow, direct_parent_bpmn_process)
if self.force_update_definitions is True:
@ -316,9 +324,7 @@ class TaskService:
task_model: TaskModel | None = TaskModel.query.filter_by(guid=spiff_task_guid).first()
bpmn_process = None
if task_model is None:
bpmn_process = self.task_bpmn_process(
spiff_task,
)
bpmn_process = self.task_bpmn_process(spiff_task)
task_definition = self.bpmn_definition_to_task_definitions_mappings[spiff_task.workflow.spec.name][
spiff_task.task_spec.name
]
@ -335,9 +341,9 @@ class TaskService:
self,
spiff_task: SpiffTask,
) -> BpmnProcessModel:
subprocess_guid, subprocess = self.__class__._task_subprocess(spiff_task)
subprocess_guid, spiff_subprocess = self.__class__._task_subprocess(spiff_task)
bpmn_process: BpmnProcessModel | None = None
if subprocess is None:
if spiff_subprocess is None:
bpmn_process = self.process_instance.bpmn_process
# This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
@ -348,11 +354,13 @@ class TaskService:
spiff_workflow=spiff_workflow,
)
else:
bpmn_process = BpmnProcessModel.query.filter_by(guid=subprocess_guid).first()
bpmn_process = None
if subprocess_guid is not None:
bpmn_process = self.bpmn_subprocess_mapping.get(subprocess_guid)
if bpmn_process is None:
spiff_workflow = spiff_task.workflow
bpmn_process = self.add_bpmn_process(
bpmn_process_dict=self.serializer.to_dict(subprocess),
bpmn_process_dict=self.serializer.to_dict(spiff_subprocess),
top_level_process=self.process_instance.bpmn_process,
bpmn_process_guid=subprocess_guid,
spiff_workflow=spiff_workflow,
@ -382,10 +390,8 @@ class TaskService:
bpmn_process_dict.pop("subprocess_specs")
bpmn_process = None
if top_level_process is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
top_level_process_id=top_level_process.id, guid=bpmn_process_guid
).first()
if top_level_process is not None and bpmn_process_guid is not None:
bpmn_process = self.bpmn_subprocess_mapping.get(bpmn_process_guid)
elif self.process_instance.bpmn_process_id is not None:
bpmn_process = self.process_instance.bpmn_process
@ -401,7 +407,7 @@ class TaskService:
if top_level_process is not None:
subprocesses = spiff_workflow.top_workflow.subprocesses
direct_bpmn_process_parent = top_level_process
direct_bpmn_process_parent: BpmnProcessModel | None = top_level_process
# BpmnWorkflows do not know their own guid so we have to cycle through subprocesses to find the guid that matches
# calling list(subprocesses) to make a copy of the keys so we can change subprocesses while iterating
@ -410,7 +416,7 @@ class TaskService:
for subprocess_guid in list(subprocesses):
subprocess = subprocesses[subprocess_guid]
if subprocess == spiff_workflow.parent_workflow:
direct_bpmn_process_parent = BpmnProcessModel.query.filter_by(guid=str(subprocess_guid)).first()
direct_bpmn_process_parent = self.bpmn_subprocess_mapping.get(str(subprocess_guid))
if direct_bpmn_process_parent is None:
raise BpmnProcessNotFoundError(
f"Could not find bpmn process with guid: {str(subprocess_guid)} "
@ -445,6 +451,10 @@ class TaskService:
spiff_workflow=spiff_workflow,
bpmn_process=bpmn_process,
)
if bpmn_process.guid is not None:
self.bpmn_subprocess_mapping[bpmn_process.guid] = bpmn_process
self.bpmn_subprocess_id_mapping[bpmn_process.id] = bpmn_process
return bpmn_process
def add_tasks_to_bpmn_process(

View File

@ -29,12 +29,14 @@ from spiffworkflow_backend.background_processing.celery_tasks.process_instance_t
from spiffworkflow_backend.data_stores.kkv import KKVDataStore
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.future_task import FutureTaskModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import MessageInstanceCorrelationRuleModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.assertion_service import safe_assertion
from spiffworkflow_backend.services.jinja_service import JinjaService
@ -249,6 +251,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
secondary_engine_step_delegate: EngineStepDelegate | None = None,
task_model_mapping: dict[str, TaskModel] | None = None,
bpmn_subprocess_mapping: dict[str, BpmnProcessModel] | None = None,
) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance
@ -266,6 +270,8 @@ class TaskModelSavingDelegate(EngineStepDelegate):
serializer=self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
run_started_at=time.time(),
task_model_mapping=task_model_mapping,
bpmn_subprocess_mapping=bpmn_subprocess_mapping,
)
def will_complete_task(self, spiff_task: SpiffTask) -> None:
@ -334,6 +340,9 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.after_engine_steps(bpmn_process_instance)
def get_guid_to_db_object_mappings(self) -> tuple[dict[str, TaskModel], dict[str, BpmnProcessModel]]:
return (self.task_service.task_model_mapping, self.task_service.bpmn_subprocess_mapping)
def _should_update_task_model(self) -> bool:
"""No reason to save task model stuff if the process instance isn't persistent."""
return self.process_instance.persistence_level != "none"

View File

@ -0,0 +1,222 @@
<?xml version="1.0" encoding="UTF-8"?>
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" id="Definitions_96f6665" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="3.0.0-dev">
<bpmn:process id="Process_mi_sequential_expanded_sub_process_with_incrementing_script_task_xyw1h4u" isExecutable="true">
<bpmn:startEvent id="StartEvent_1">
<bpmn:outgoing>Flow_17db3yp</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_17db3yp" sourceRef="StartEvent_1" targetRef="Activity_0s00ezf" />
<bpmn:endEvent id="EndEvent_1">
<bpmn:incoming>Flow_0v0kh3k</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_0wx95ot" sourceRef="Activity_0s00ezf" targetRef="Gateway_13qwia7" />
<bpmn:scriptTask id="Activity_0s00ezf" name="Set Input Collection Size List">
<bpmn:incoming>Flow_17db3yp</bpmn:incoming>
<bpmn:outgoing>Flow_0wx95ot</bpmn:outgoing>
<bpmn:script>input_collection_size_list = [5, 10, 20, 40, 60] #, 160, 320]
loop_cnt = 0
loop_max = len(input_collection_size_list)</bpmn:script>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_1li7d78" name="Create Input Collection">
<bpmn:incoming>Flow_1av87dx</bpmn:incoming>
<bpmn:outgoing>Flow_1a06mr7</bpmn:outgoing>
<bpmn:script>input_collection = []
for x in range(input_collection_size_list[loop_cnt]):
input_collection.append(x)
del(x)</bpmn:script>
</bpmn:scriptTask>
<bpmn:exclusiveGateway id="Gateway_13qwia7">
<bpmn:incoming>Flow_0wx95ot</bpmn:incoming>
<bpmn:incoming>Flow_0h1frjl</bpmn:incoming>
<bpmn:outgoing>Flow_1av87dx</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="Flow_1av87dx" sourceRef="Gateway_13qwia7" targetRef="Activity_1li7d78" />
<bpmn:subProcess id="Activity_1mg0q68">
<bpmn:incoming>Flow_09b6hjo</bpmn:incoming>
<bpmn:outgoing>Flow_1ssduxn</bpmn:outgoing>
<bpmn:multiInstanceLoopCharacteristics isSequential="true">
<bpmn:loopDataInputRef>input_collection</bpmn:loopDataInputRef>
<bpmn:loopDataOutputRef>output_collection</bpmn:loopDataOutputRef>
<bpmn:inputDataItem id="input_element" name="input_element" />
<bpmn:outputDataItem id="output_element" name="output_element" />
</bpmn:multiInstanceLoopCharacteristics>
<bpmn:startEvent id="Event_0rgjjkl">
<bpmn:outgoing>Flow_082jbuo</bpmn:outgoing>
</bpmn:startEvent>
<bpmn:sequenceFlow id="Flow_082jbuo" sourceRef="Event_0rgjjkl" targetRef="Activity_1akaqlb" />
<bpmn:endEvent id="Event_053gcfh">
<bpmn:incoming>Flow_02jeqnp</bpmn:incoming>
</bpmn:endEvent>
<bpmn:sequenceFlow id="Flow_02jeqnp" sourceRef="Activity_1akaqlb" targetRef="Event_053gcfh" />
<bpmn:scriptTask id="Activity_1akaqlb" name="Set Output Element">
<bpmn:incoming>Flow_082jbuo</bpmn:incoming>
<bpmn:outgoing>Flow_02jeqnp</bpmn:outgoing>
<bpmn:script>output_element = input_element</bpmn:script>
</bpmn:scriptTask>
</bpmn:subProcess>
<bpmn:sequenceFlow id="Flow_1a06mr7" sourceRef="Activity_1li7d78" targetRef="Event_1k9xqdt" />
<bpmn:sequenceFlow id="Flow_1ssduxn" sourceRef="Activity_1mg0q68" targetRef="Event_0mwdsn5" />
<bpmn:exclusiveGateway id="Gateway_1mz5u81" name="Exceeded Loop Count?" default="Flow_17ph1cp">
<bpmn:incoming>Flow_11bwcp9</bpmn:incoming>
<bpmn:outgoing>Flow_0v0kh3k</bpmn:outgoing>
<bpmn:outgoing>Flow_17ph1cp</bpmn:outgoing>
</bpmn:exclusiveGateway>
<bpmn:sequenceFlow id="Flow_0v0kh3k" name="Yes" sourceRef="Gateway_1mz5u81" targetRef="EndEvent_1">
<bpmn:conditionExpression>loop_cnt == loop_max</bpmn:conditionExpression>
</bpmn:sequenceFlow>
<bpmn:sequenceFlow id="Flow_17ph1cp" name="No" sourceRef="Gateway_1mz5u81" targetRef="Activity_09kc18u" />
<bpmn:intermediateThrowEvent id="Event_1k9xqdt" name="Start MI">
<bpmn:incoming>Flow_1a06mr7</bpmn:incoming>
<bpmn:outgoing>Flow_09b6hjo</bpmn:outgoing>
</bpmn:intermediateThrowEvent>
<bpmn:sequenceFlow id="Flow_09b6hjo" sourceRef="Event_1k9xqdt" targetRef="Activity_1mg0q68" />
<bpmn:intermediateThrowEvent id="Event_0mwdsn5" name="End MI">
<bpmn:incoming>Flow_1ssduxn</bpmn:incoming>
<bpmn:outgoing>Flow_1ydr641</bpmn:outgoing>
</bpmn:intermediateThrowEvent>
<bpmn:sequenceFlow id="Flow_1ydr641" sourceRef="Event_0mwdsn5" targetRef="Activity_0wvm7fd" />
<bpmn:sequenceFlow id="Flow_11bwcp9" sourceRef="Activity_0wvm7fd" targetRef="Gateway_1mz5u81" />
<bpmn:scriptTask id="Activity_0wvm7fd" name="Increment Count">
<bpmn:incoming>Flow_1ydr641</bpmn:incoming>
<bpmn:outgoing>Flow_11bwcp9</bpmn:outgoing>
<bpmn:script>loop_cnt = loop_cnt + 1</bpmn:script>
</bpmn:scriptTask>
<bpmn:sequenceFlow id="Flow_0h1frjl" sourceRef="Activity_09kc18u" targetRef="Gateway_13qwia7" />
<bpmn:scriptTask id="Activity_09kc18u" name="Clear MI Variables">
<bpmn:incoming>Flow_17ph1cp</bpmn:incoming>
<bpmn:outgoing>Flow_0h1frjl</bpmn:outgoing>
<bpmn:script>
del(output_collection)
# del(output_element)</bpmn:script>
</bpmn:scriptTask>
<bpmn:scriptTask id="Activity_0k3gwuu">
<bpmn:multiInstanceLoopCharacteristics isSequential="true">
<bpmn:loopDataInputRef>input_collection</bpmn:loopDataInputRef>
<bpmn:loopDataOutputRef>output_collection</bpmn:loopDataOutputRef>
<bpmn:inputDataItem id="input_element" name="input_element" />
<bpmn:outputDataItem id="output_element" name="output_element" />
</bpmn:multiInstanceLoopCharacteristics>
<bpmn:script>output_element = input_element</bpmn:script>
</bpmn:scriptTask>
</bpmn:process>
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="Process_mi_sequential_expanded_sub_process_with_incrementing_script_task_xyw1h4u">
<bpmndi:BPMNShape id="_BPMNShape_StartEvent_2" bpmnElement="StartEvent_1">
<dc:Bounds x="-188" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_14za570_di" bpmnElement="EndEvent_1">
<dc:Bounds x="1242" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1nttchd_di" bpmnElement="Activity_0s00ezf">
<dc:Bounds x="-80" y="137" width="100" height="80" />
<bpmndi:BPMNLabel />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1p4pvah_di" bpmnElement="Activity_1li7d78">
<dc:Bounds x="190" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Gateway_13qwia7_di" bpmnElement="Gateway_13qwia7" isMarkerVisible="true">
<dc:Bounds x="65" y="152" width="50" height="50" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_15v049s_di" bpmnElement="Activity_0k3gwuu">
<dc:Bounds x="585" y="350" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1mg0q68_di" bpmnElement="Activity_1mg0q68" isExpanded="true">
<dc:Bounds x="450" y="77" width="370" height="200" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0rgjjkl_di" bpmnElement="Event_0rgjjkl">
<dc:Bounds x="490" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_053gcfh_di" bpmnElement="Event_053gcfh">
<dc:Bounds x="742" y="159" width="36" height="36" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_1mg71ot_di" bpmnElement="Activity_1akaqlb">
<dc:Bounds x="580" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_082jbuo_di" bpmnElement="Flow_082jbuo">
<di:waypoint x="526" y="177" />
<di:waypoint x="580" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_02jeqnp_di" bpmnElement="Flow_02jeqnp">
<di:waypoint x="680" y="177" />
<di:waypoint x="742" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNShape id="Gateway_1mz5u81_di" bpmnElement="Gateway_1mz5u81" isMarkerVisible="true">
<dc:Bounds x="1105" y="152" width="50" height="50" />
<bpmndi:BPMNLabel>
<dc:Bounds x="1092" y="209" width="77" height="27" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_1k9xqdt_di" bpmnElement="Event_1k9xqdt">
<dc:Bounds x="342" y="159" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="341" y="202" width="39" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Event_0mwdsn5_di" bpmnElement="Event_0mwdsn5">
<dc:Bounds x="882" y="159" width="36" height="36" />
<bpmndi:BPMNLabel>
<dc:Bounds x="883" y="202" width="35" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0kerzww_di" bpmnElement="Activity_0wvm7fd">
<dc:Bounds x="960" y="137" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape id="Activity_0t3g787_di" bpmnElement="Activity_09kc18u">
<dc:Bounds x="1080" y="20" width="100" height="80" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge id="Flow_17db3yp_di" bpmnElement="Flow_17db3yp">
<di:waypoint x="-152" y="177" />
<di:waypoint x="-80" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0wx95ot_di" bpmnElement="Flow_0wx95ot">
<di:waypoint x="20" y="177" />
<di:waypoint x="65" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1av87dx_di" bpmnElement="Flow_1av87dx">
<di:waypoint x="115" y="177" />
<di:waypoint x="190" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1a06mr7_di" bpmnElement="Flow_1a06mr7">
<di:waypoint x="290" y="177" />
<di:waypoint x="342" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ssduxn_di" bpmnElement="Flow_1ssduxn">
<di:waypoint x="820" y="177" />
<di:waypoint x="882" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0v0kh3k_di" bpmnElement="Flow_0v0kh3k">
<di:waypoint x="1155" y="177" />
<di:waypoint x="1242" y="177" />
<bpmndi:BPMNLabel>
<dc:Bounds x="1190" y="159" width="18" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_17ph1cp_di" bpmnElement="Flow_17ph1cp">
<di:waypoint x="1130" y="152" />
<di:waypoint x="1130" y="100" />
<bpmndi:BPMNLabel>
<dc:Bounds x="603" y="-8.000000000000028" width="15" height="14" />
</bpmndi:BPMNLabel>
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_09b6hjo_di" bpmnElement="Flow_09b6hjo">
<di:waypoint x="378" y="177" />
<di:waypoint x="450" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_1ydr641_di" bpmnElement="Flow_1ydr641">
<di:waypoint x="918" y="177" />
<di:waypoint x="960" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_11bwcp9_di" bpmnElement="Flow_11bwcp9">
<di:waypoint x="1060" y="177" />
<di:waypoint x="1105" y="177" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge id="Flow_0h1frjl_di" bpmnElement="Flow_0h1frjl">
<di:waypoint x="1130" y="20" />
<di:waypoint x="1130" y="-30" />
<di:waypoint x="90" y="-30" />
<di:waypoint x="90" y="152" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</bpmn:definitions>

View File

@ -306,19 +306,25 @@ class BaseTest:
process_model: ProcessModelInfo,
status: str | None = "not_started",
user: UserModel | None = None,
save_start_and_end_times: bool = True,
) -> ProcessInstanceModel:
if user is None:
user = self.find_or_create_user()
current_time = round(time.time())
start_in_seconds = None
end_in_seconds = None
if save_start_and_end_times:
start_in_seconds = current_time - (3600 * 1)
end_in_seconds = current_time - (3600 * 1 - 20)
process_instance = ProcessInstanceModel(
status=status,
process_initiator=user,
process_model_identifier=process_model.id,
process_model_display_name=process_model.display_name,
updated_at_in_seconds=round(time.time()),
start_in_seconds=current_time - (3600 * 1),
end_in_seconds=current_time - (3600 * 1 - 20),
start_in_seconds=start_in_seconds,
end_in_seconds=end_in_seconds,
)
db.session.add(process_instance)
db.session.commit()

View File

@ -133,6 +133,7 @@ class TestProcessModelsController(BaseTest):
mock_post.return_value.ok = True
mock_post.return_value.text = json.dumps(connector_response)
processor.do_engine_steps(save=True)
self.complete_next_manual_task(processor, execution_mode="synchronous")
self.complete_next_manual_task(processor, execution_mode="synchronous", data={"firstName": "Chuck"})
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()

View File

@ -1076,3 +1076,27 @@ class TestProcessInstanceProcessor(BaseTest):
assert process_instance.summary is not None
# mypy thinks this is unreachable but it is reachable. summary can be str | None
assert len(process_instance.summary) == 255 # type: ignore
# To test processing times with multiinstance subprocesses
# def test_large_multiinstance(
# self,
# app: Flask,
# client: FlaskClient,
# with_db_and_bpmn_file_cleanup: None,
# ) -> None:
# process_model = load_test_spec(
# process_model_id="test_group/multiinstance_with_subprocess_and_large_dataset",
# process_model_source_directory="multiinstance_with_subprocess_and_large_dataset",
# )
# process_instance = self.create_process_instance_from_process_model(
# process_model=process_model, save_start_and_end_times=False
# )
#
# processor = ProcessInstanceProcessor(process_instance)
# # start_time = time.time()
# processor.do_engine_steps(save=True, execution_strategy_name="greedy")
# # end_time = time.time()
# # duration = end_time - start_time
# # assert processor.process_instance_model.end_in_seconds is not None
# # duration = processor.process_instance_model.end_in_seconds - processor.process_instance_model.created_at_in_seconds
# # print(f"➡️ ➡️ ➡️ duration: {duration}")