Omit completed subprocesses (#1327)

* updates to avoid loading completed subprocesses when loading a processor w/ burnettk

* bpmn unit test generator in controller test is passing w/ burnettk

* most tests are passing again w/ burnettk

* tests are all passing now w/ burnettk

* added assertion to reset pi test to ensure task data does come back w/ burnettk

* removed debug benchmark stuff w/ burnettk

* pyl w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-04-03 16:17:14 +00:00 committed by GitHub
parent d7bd03bb69
commit b35ab336f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 59 additions and 20 deletions

View File

@ -3031,7 +3031,7 @@ doc = ["sphinx", "sphinx_rtd_theme"]
type = "git"
url = "https://github.com/sartography/SpiffWorkflow"
reference = "main"
resolved_reference = "86cb84d29cb25c1a5a407a702ef35cf7b469df6b"
resolved_reference = "c1150c68ae0eb342ed13b57544e1bb0749255cb5"
[[package]]
name = "spiffworkflow-connector-command"

View File

@ -16,7 +16,9 @@ class Version4(DataMigrationBase):
def run(cls, process_instance: ProcessInstanceModel) -> None:
# return None
try:
processor = ProcessInstanceProcessor(process_instance)
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True
)
bpmn_process_dict = processor.serialize()
update_data_objects(bpmn_process_dict)
ProcessInstanceProcessor.persist_bpmn_process_dict(

View File

@ -0,0 +1,14 @@
import time
from collections.abc import Generator
from contextlib import contextmanager
from flask import current_app
@contextmanager
def benchmark(message: str) -> Generator:
"""Benchmark method useful for debugging slow stuff."""
t1 = time.perf_counter()
yield
t2 = time.perf_counter()
current_app.logger.debug(f"{message}, Time={t2 - t1}")

View File

@ -367,7 +367,9 @@ def process_model_test_generate(modified_process_model_identifier: str, body: di
test_case_identifier = body.get("test_case_identifier", f"test_case_for_process_instance_{process_instance_id}")
process_instance = _find_process_instance_by_id_or_raise(int(process_instance_id))
processor = ProcessInstanceProcessor(process_instance, include_task_data_for_completed_tasks=True)
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True
)
process_instance_dict = processor.serialize()
test_case_dict = ProcessModelTestGeneratorService.generate_test_from_process_instance_dict(
process_instance_dict, test_case_identifier=str(test_case_identifier)

View File

@ -421,6 +421,7 @@ class ProcessInstanceProcessor:
process_id_to_run: str | None = None,
additional_processing_identifier: str | None = None,
include_task_data_for_completed_tasks: bool = False,
include_completed_subprocesses: bool = False,
) -> None:
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
self._script_engine = script_engine or self.__class__._default_script_engine
@ -430,6 +431,7 @@ class ProcessInstanceProcessor:
process_instance_model=process_instance_model,
process_id_to_run=process_id_to_run,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
include_completed_subprocesses=include_completed_subprocesses,
)
def setup_processor_with_process_instance(
@ -437,6 +439,7 @@ class ProcessInstanceProcessor:
process_instance_model: ProcessInstanceModel,
process_id_to_run: str | None = None,
include_task_data_for_completed_tasks: bool = False,
include_completed_subprocesses: bool = False,
) -> None:
tld = current_app.config["THREAD_LOCAL_DATA"]
tld.process_instance_id = process_instance_model.id
@ -480,6 +483,8 @@ class ProcessInstanceProcessor:
process_instance_model,
bpmn_process_spec,
subprocesses=subprocesses,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
include_completed_subprocesses=include_completed_subprocesses,
)
self.set_script_engine(self.bpmn_process_instance, self._script_engine)
@ -675,11 +680,11 @@ class ProcessInstanceProcessor:
include_task_data_for_completed_tasks: bool = False,
) -> None:
json_data_hashes = set()
states_to_not_rehydrate_data: list[str] = []
if include_task_data_for_completed_tasks:
states_to_not_rehydrate_data = ["COMPLETED", "CANCELLED", "ERROR"]
states_to_exclude_from_rehydration: list[str] = []
if not include_task_data_for_completed_tasks:
states_to_exclude_from_rehydration = ["COMPLETED", "CANCELLED", "ERROR"]
for task in tasks:
if task.state not in states_to_not_rehydrate_data:
if task.state not in states_to_exclude_from_rehydration:
json_data_hashes.add(task.json_data_hash)
json_data_records = JsonDataModel.query.filter(JsonDataModel.hash.in_(json_data_hashes)).all() # type: ignore
json_data_mappings = {}
@ -692,7 +697,7 @@ class ProcessInstanceProcessor:
tasks_dict = spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess_guid]["tasks"]
tasks_dict[task.guid] = task.properties_json
task_data = {}
if task.state not in states_to_not_rehydrate_data:
if task.state not in states_to_exclude_from_rehydration:
task_data = json_data_mappings[task.json_data_hash]
tasks_dict[task.guid]["data"] = task_data
@ -702,6 +707,7 @@ class ProcessInstanceProcessor:
process_instance_model: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
include_task_data_for_completed_tasks: bool = False,
include_completed_subprocesses: bool = False,
) -> dict:
if process_instance_model.bpmn_process_definition_id is None:
return {}
@ -767,7 +773,14 @@ class ProcessInstanceProcessor:
)
spiff_bpmn_process_dict.update(single_bpmn_process_dict)
bpmn_subprocesses = BpmnProcessModel.query.filter_by(top_level_process_id=bpmn_process.id).all()
bpmn_subprocesses_query = BpmnProcessModel.query.filter_by(top_level_process_id=bpmn_process.id)
if not include_completed_subprocesses:
bpmn_subprocesses_query = bpmn_subprocesses_query.join(
TaskModel, TaskModel.guid == BpmnProcessModel.guid
).filter(
TaskModel.state.not_in(["COMPLETED", "ERROR", "CANCELLED"]) # type: ignore
)
bpmn_subprocesses = bpmn_subprocesses_query.all()
bpmn_subprocess_id_to_guid_mappings = {}
for bpmn_subprocess in bpmn_subprocesses:
subprocess_identifier = bpmn_subprocess.bpmn_process_definition.bpmn_identifier
@ -820,6 +833,7 @@ class ProcessInstanceProcessor:
spec: BpmnProcessSpec | None = None,
subprocesses: IdToBpmnProcessSpecMapping | None = None,
include_task_data_for_completed_tasks: bool = False,
include_completed_subprocesses: bool = False,
) -> tuple[BpmnWorkflow, dict, dict]:
full_bpmn_process_dict = {}
bpmn_definition_to_task_definitions_mappings: dict = {}
@ -833,6 +847,7 @@ class ProcessInstanceProcessor:
full_bpmn_process_dict = ProcessInstanceProcessor._get_full_bpmn_process_dict(
process_instance_model,
bpmn_definition_to_task_definitions_mappings,
include_completed_subprocesses=include_completed_subprocesses,
include_task_data_for_completed_tasks=include_task_data_for_completed_tasks,
)
# FIXME: the from_dict entrypoint in spiff will one day do this copy instead
@ -1247,7 +1262,9 @@ class ProcessInstanceProcessor:
ProcessInstanceTmpService.add_event_to_process_instance(
process_instance, ProcessInstanceEventType.process_instance_rewound_to_task.value, task_guid=to_task_guid
)
processor = ProcessInstanceProcessor(process_instance, include_task_data_for_completed_tasks=True)
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True
)
deleted_tasks = processor.bpmn_process_instance.reset_from_task_id(UUID(to_task_guid))
spiff_tasks = processor.bpmn_process_instance.get_tasks()

View File

@ -2030,11 +2030,10 @@ class TestProcessApi(BaseTest):
)
assert response.status_code == 400
assert process_instance.status == "error"
processor = ProcessInstanceProcessor(process_instance)
processor = ProcessInstanceProcessor(process_instance, include_task_data_for_completed_tasks=True)
spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance)
assert spiff_task is not None
# TODO: remove these 2 lines if we enable no task data on rehydration again from pr-661
assert spiff_task.state == TaskState.ERROR
assert spiff_task.data == {"my_var": "THE VAR"}

View File

@ -110,7 +110,9 @@ class TestProcessInstanceMigrator(BaseTest):
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
# ensure data was imported correctly and is in expected state
processor = ProcessInstanceProcessor(process_instance)
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True
)
bpmn_process_dict_version_3_after_import = processor.serialize()
self.round_last_state_change(bpmn_process_dict_version_3)
self.round_last_state_change(bpmn_process_dict_version_3_after_import)
@ -125,7 +127,9 @@ class TestProcessInstanceMigrator(BaseTest):
Version4.run(process_instance)
update_data_objects(bpmn_process_dict_version_4_from_spiff)
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor = ProcessInstanceProcessor(process_instance)
processor = ProcessInstanceProcessor(
process_instance, include_task_data_for_completed_tasks=True, include_completed_subprocesses=True
)
bpmn_process_dict_version_4 = processor.serialize()
self.round_last_state_change(bpmn_process_dict_version_4)
self.round_last_state_change(bpmn_process_dict_version_4_from_spiff)

View File

@ -209,7 +209,7 @@ class TestProcessInstanceProcessor(BaseTest):
processor.do_engine_steps(save=True)
# ensure this does not raise
processor = ProcessInstanceProcessor(process_instance)
processor = ProcessInstanceProcessor(process_instance, include_completed_subprocesses=True)
# this task will be found within subprocesses
spiff_task = processor.__class__.get_task_by_bpmn_identifier("level_3_script_task", processor.bpmn_process_instance)
@ -330,8 +330,6 @@ class TestProcessInstanceProcessor(BaseTest):
task_model_to_reset_to = all_task_models_matching_top_level_subprocess_script[0]
assert task_model_to_reset_to is not None
assert len(process_instance.human_tasks) == 3, "expected 3 human tasks before reset"
processor = ProcessInstanceProcessor(process_instance)
processor = ProcessInstanceProcessor(process_instance)
ProcessInstanceProcessor.reset_process(process_instance, task_model_to_reset_to.guid)
assert len(process_instance.human_tasks) == 2, "still expected 2 human tasks after reset"
@ -348,6 +346,8 @@ class TestProcessInstanceProcessor(BaseTest):
task for task in ready_or_waiting_tasks if task.task_spec.name == "top_level_subprocess_script"
)
assert top_level_subprocess_script_spiff_task is not None
# make sure we did not remove the data during the reset which can happen if include_task_data_for_completed_tasks is False
assert top_level_subprocess_script_spiff_task.data == {"set_in_top_level_script": 1}
processor.resume()
assert (
len(process_instance.human_tasks) == 2
@ -412,7 +412,6 @@ class TestProcessInstanceProcessor(BaseTest):
"manual_task_1", processor.bpmn_process_instance
)
processor.suspend()
processor = ProcessInstanceProcessor(process_instance)
ProcessInstanceProcessor.reset_process(process_instance, str(reset_to_spiff_task.id))
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
human_task_one = process_instance.active_human_tasks[0]
@ -527,9 +526,11 @@ class TestProcessInstanceProcessor(BaseTest):
# recreate variables to ensure all bpmn json was recreated from scratch from the db
process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor_final = ProcessInstanceProcessor(process_instance_relookup)
processor_final.do_engine_steps(save=True, execution_strategy_name="greedy")
processor_last_tasks = ProcessInstanceProcessor(process_instance_relookup)
processor_last_tasks.do_engine_steps(save=True, execution_strategy_name="greedy")
process_instance_relookup = ProcessInstanceModel.query.filter_by(id=process_instance.id).first()
processor_final = ProcessInstanceProcessor(process_instance_relookup, include_completed_subprocesses=True)
assert process_instance_relookup.status == "complete"
data_set_1 = {"set_in_top_level_script": 1}