From b89527ab808c23beed64b4fd8c3e1c6115fad1b0 Mon Sep 17 00:00:00 2001 From: jbirddog <100367399+jbirddog@users.noreply.github.com> Date: Thu, 4 May 2023 14:15:13 -0400 Subject: [PATCH] Integrate spiff-element-units 0.3.0 for (some) lazy call activities (#239) --- spiffworkflow-backend/poetry.lock | 28 +++++----- spiffworkflow-backend/pyproject.toml | 2 +- .../routes/tasks_controller.py | 2 +- .../services/element_units_service.py | 22 ++++++-- .../services/process_instance_processor.py | 53 ++++++++++++++++++- .../services/workflow_execution_service.py | 44 +++++++++------ .../unit/test_element_units_service.py | 8 +-- 7 files changed, 117 insertions(+), 42 deletions(-) diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 9c0a426d7..7fc9d6b3f 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -3263,23 +3263,23 @@ test = ["pytest"] [[package]] name = "spiff-element-units" -version = "0.1.0" +version = "0.3.0" description = "" category = "main" optional = false -python-versions = ">=3.7" +python-versions = ">=3.9" files = [ - {file = "spiff_element_units-0.1.0-cp39-abi3-macosx_10_7_x86_64.whl", hash = "sha256:fc34e1012a922037cf5d04c154a37119bc1ba83cc536d79fde601da42703d5f7"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:664197124c2a81c780d83a60750ad4411dda22a31e0db7615007a3905393fa4b"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:34409c2a1f24dfca99afafd3b1caa9e2fba66c81864954f7f9ebf8030bc632c8"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:dcc1307447f30f597d31224d855c979f5c8cca5906792cbf7d9418ee2bcac54b"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f40788203884db9d15e1e2c6b5d7b019b38606750f3df75f66d6c14fe492b5b4"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:78afa90e8ee48dfe84b8accb3399212297cb2b05f7e5288ae57614f68a2cffd8"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aab909b3d8f896eb2f2ed64273a7e013564a708e875883dafe76cdb34e459cb3"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:1a264c4ad717d83fe1d7b31d4c9a26143a3a9eff9cfdb09216f4fc1ef029e178"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-win32.whl", hash = "sha256:9b2e25b8b18ae006c39cd0b8bac11b08d60bf0e53b60a9cca9a3cec3750f0176"}, - {file = "spiff_element_units-0.1.0-cp39-abi3-win_amd64.whl", hash = "sha256:825594fa95496db3a9a7826c9cbf95b90b6a5676838bbd9fe23c5ff32a2d2920"}, - {file = "spiff_element_units-0.1.0.tar.gz", hash = "sha256:807e207562220f350cd0f0b6a46484c7b976c4effe4b194701179add7abe871a"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-macosx_10_7_x86_64.whl", hash = "sha256:35d2ede30e4d2fc715882c44b19e70e7289e1bb6fdff213198ba1e5d073987d1"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:d2a9dde576144ba040ca7fb11ee5487e7c7507b8ee7f1c6c12d24066b67c24c7"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:15e6d02e5fd59c99484afec41f7f96272e38b7f933e17be9a29b2e3bd43b8396"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:49e2f6472d5eb5c90978295042e2b3d000e3b88e833fd655d3fe30a1ba7d225b"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:eed527049bded8ea25ead384547e1ea35e99fe97353c78fd4ad014c8cd2b12b8"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:63a0f60ce401641dba142a95952330bb18f6a9f77f1ff99f8ef2a392e08774b4"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:43b13dcd21f722758c5cc6e16c317680dd7011e0b9aca83fd478c28b13eb25e0"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:2625b15d2c705fe07f6e472f75142e0f036d3bd567e6e5e946f75fd96dc8396a"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-win32.whl", hash = "sha256:fe07bbe1e6e036ffb1fd78d719f5e83c160e657cf5e6eeee5938946e9b940116"}, + {file = "spiff_element_units-0.3.0-cp39-abi3-win_amd64.whl", hash = "sha256:beacf2857ba196b39a66f481e9989513e4df607e6d2157d4c1c1e463890ca806"}, + {file = "spiff_element_units-0.3.0.tar.gz", hash = "sha256:5c740c70adf7e0fc39c9a3a199c4f1d5d5941d61b28c90637b64df4dc3df57dd"}, ] [[package]] @@ -3938,4 +3938,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more [metadata] lock-version = "2.0" python-versions = ">=3.9,<3.12" -content-hash = "53f3340f73de770b4fbebff3fcd396cdf1bc2c082b929ade350f31a9df6c3860" +content-hash = "bc4e0dabc679dd84e7803930d043a934276298715657bf85226ec9541eeebfb9" diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml index b8ce91156..c7edf450b 100644 --- a/spiffworkflow-backend/pyproject.toml +++ b/spiffworkflow-backend/pyproject.toml @@ -86,7 +86,7 @@ prometheus-flask-exporter = "^0.22.3" safety = "^2.3.5" sqlalchemy = "^2.0.7" marshmallow-sqlalchemy = "^0.29.0" -spiff-element-units = "^0.1.0" +spiff-element-units = "^0.3.0" [tool.poetry.dev-dependencies] pytest = "^7.1.2" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py index 9899ae09f..66f93bb46 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/tasks_controller.py @@ -437,7 +437,7 @@ def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[st ) yield f"data: {current_app.json.dumps(api_error)} \n\n" return - processor.bpmn_process_instance.refresh_waiting_tasks() + processor.refresh_waiting_tasks() ready_engine_task_count = get_ready_engine_step_count(processor.bpmn_process_instance) tasks = get_reportable_tasks() if ready_engine_task_count == 0: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py index 6da5bb14b..67bba7a13 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py @@ -29,16 +29,23 @@ class ElementUnitsService: # for now we are importing inside each of these functions, not sure the best # way to do this in an overall feature flagged strategy but this gets things # moving - import spiff_element_units # type: ignore + import spiff_element_units + + cache_dir = cls._cache_dir() + if cache_dir is None: + # make mypy happy + return None bpmn_spec_json = json.dumps(bpmn_spec_dict) - spiff_element_units.cache_element_units_for_workflow(cls._cache_dir(), cache_key, bpmn_spec_json) + spiff_element_units.cache_element_units_for_workflow(cache_dir, cache_key, bpmn_spec_json) except Exception as e: current_app.logger.exception(e) return None @classmethod - def workflow_from_cached_element_unit(cls, cache_key: str, element_id: str) -> Optional[BpmnSpecDict]: + def workflow_from_cached_element_unit( + cls, cache_key: str, process_id: str, element_id: str + ) -> Optional[BpmnSpecDict]: if not cls._enabled(): return None @@ -48,8 +55,15 @@ class ElementUnitsService: # moving import spiff_element_units + cache_dir = cls._cache_dir() + if cache_dir is None: + # make mypy happy + return None + + current_app.logger.info(f"Checking element unit cache @ {cache_key} :: '{process_id}' - '{element_id}'") + bpmn_spec_json = spiff_element_units.workflow_from_cached_element_unit( - cls._cache_dir(), cache_key, element_id + cache_dir, cache_key, process_id, element_id ) return json.loads(bpmn_spec_json) # type: ignore except Exception as e: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 938c7868f..f806eb481 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -680,6 +680,7 @@ class ProcessInstanceProcessor: element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit( full_process_model_hash, bpmn_process_definition.bpmn_identifier, + bpmn_process_definition.bpmn_identifier, ) if element_unit_process_dict is not None: spiff_bpmn_process_dict["spec"] = element_unit_process_dict["spec"] @@ -693,6 +694,10 @@ class ProcessInstanceProcessor: bpmn_subprocesses = BpmnProcessModel.query.filter_by(top_level_process_id=bpmn_process.id).all() bpmn_subprocess_id_to_guid_mappings = {} for bpmn_subprocess in bpmn_subprocesses: + subprocess_identifier = bpmn_subprocess.bpmn_process_definition.bpmn_identifier + if subprocess_identifier not in spiff_bpmn_process_dict["subprocess_specs"]: + current_app.logger.info(f"Deferring subprocess spec: '{subprocess_identifier}'") + continue bpmn_subprocess_id_to_guid_mappings[bpmn_subprocess.id] = bpmn_subprocess.guid single_bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess) spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess.guid] = single_bpmn_process_dict @@ -1544,6 +1549,50 @@ class ProcessInstanceProcessor: db.session.add(message_instance) db.session.commit() + def element_unit_specs_loader(self, process_id: str, element_id: str) -> Optional[Dict[str, Any]]: + full_process_model_hash = self.process_instance_model.bpmn_process_definition.full_process_model_hash + if full_process_model_hash is None: + return None + + element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit( + full_process_model_hash, + process_id, + element_id, + ) + + if element_unit_process_dict is not None: + spec_dict = element_unit_process_dict["spec"] + subprocess_specs_dict = element_unit_process_dict["subprocess_specs"] + + restored_specs = {k: self.wf_spec_converter.restore(v) for k, v in subprocess_specs_dict.items()} + restored_specs[spec_dict["name"]] = self.wf_spec_converter.restore(spec_dict) + + return restored_specs + + return None + + def lazy_load_subprocess_specs(self) -> None: + tasks = self.bpmn_process_instance.get_tasks(TaskState.DEFINITE_MASK) + loaded_specs = set(self.bpmn_process_instance.subprocess_specs.keys()) + for task in tasks: + if task.task_spec.spec_type != "Call Activity": + continue + spec_to_check = task.task_spec.spec + + if spec_to_check not in loaded_specs: + lazy_subprocess_specs = self.element_unit_specs_loader(spec_to_check, spec_to_check) + if lazy_subprocess_specs is None: + continue + + for name, spec in lazy_subprocess_specs.items(): + if name not in loaded_specs: + self.bpmn_process_instance.subprocess_specs[name] = spec + loaded_specs.add(name) + + def refresh_waiting_tasks(self) -> None: + self.lazy_load_subprocess_specs() + self.bpmn_process_instance.refresh_waiting_tasks() + def do_engine_steps( self, exit_at: None = None, @@ -1577,7 +1626,9 @@ class ProcessInstanceProcessor: "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set" ) - execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate) + execution_strategy = execution_strategy_named( + execution_strategy_name, task_model_delegate, self.lazy_load_subprocess_specs + ) execution_service = WorkflowExecutionService( self.bpmn_process_instance, self.process_instance_model, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 13ac5a7fd..0f2071b25 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -3,7 +3,10 @@ from __future__ import annotations import copy import time from abc import abstractmethod +from typing import Any from typing import Callable +from typing import Dict +from typing import Optional from uuid import UUID from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore @@ -74,12 +77,16 @@ class EngineStepDelegate: pass +SubprocessSpecLoader = Callable[[], Optional[Dict[str, Any]]] + + class ExecutionStrategy: """Interface of sorts for a concrete execution strategy.""" - def __init__(self, delegate: EngineStepDelegate): + def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader): """__init__.""" self.delegate = delegate + self.subprocess_spec_loader = subprocess_spec_loader @abstractmethod def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: @@ -92,7 +99,7 @@ class ExecutionStrategy: self.delegate.save(bpmn_process_instance) def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: - return list( + tasks = list( [ t for t in bpmn_process_instance.get_tasks(TaskState.READY) @@ -100,6 +107,12 @@ class ExecutionStrategy: ] ) + if len(tasks) > 0: + self.subprocess_spec_loader() + tasks = [tasks[0]] + + return tasks + class TaskModelSavingDelegate(EngineStepDelegate): """Engine step delegate that takes care of saving a task model to the database. @@ -263,19 +276,14 @@ class GreedyExecutionStrategy(ExecutionStrategy): spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY. """ - self.bpmn_process_instance.do_engine_steps( - exit_at=exit_at, - will_complete_task=self.delegate.will_complete_task, - did_complete_task=self.delegate.did_complete_task, - ) - - self.bpmn_process_instance.refresh_waiting_tasks() - ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY) - non_human_waiting_task = next( - (p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None - ) - if non_human_waiting_task is not None: - self.run_until_user_input_required(exit_at) + engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance) + while engine_steps: + for spiff_task in engine_steps: + self.delegate.will_complete_task(spiff_task) + spiff_task.run() + self.delegate.did_complete_task(spiff_task) + self.bpmn_process_instance.refresh_waiting_tasks() + engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance) class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): @@ -338,7 +346,9 @@ class OneAtATimeExecutionStrategy(ExecutionStrategy): self.delegate.after_engine_steps(bpmn_process_instance) -def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> ExecutionStrategy: +def execution_strategy_named( + name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader +) -> ExecutionStrategy: cls = { "greedy": GreedyExecutionStrategy, "run_until_service_task": RunUntilServiceTaskExecutionStrategy, @@ -346,7 +356,7 @@ def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> Executi "one_at_a_time": OneAtATimeExecutionStrategy, }[name] - return cls(delegate) # type: ignore + return cls(delegate, spec_loader) # type: ignore ProcessInstanceCompleter = Callable[[BpmnWorkflow], None] diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py index d07393d5d..6551ea10b 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py @@ -93,7 +93,7 @@ class TestElementUnitsService(BaseTest): self, app_disabled: Flask, ) -> None: - result = ElementUnitsService.workflow_from_cached_element_unit("", "") + result = ElementUnitsService.workflow_from_cached_element_unit("", "", "") assert result is None def test_can_write_to_cache( @@ -122,12 +122,12 @@ class TestElementUnitsService(BaseTest): example_specs_dict: BpmnSpecDict, ) -> None: ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict) - cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks") - assert cached_specs_dict == example_specs_dict + cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "no_tasks") + assert cached_specs_dict["spec"]["name"] == example_specs_dict["spec"]["name"] # type: ignore def test_reading_element_unit_for_uncached_process_returns_none( self, app_enabled_tmp_cache_dir: Flask, ) -> None: - cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks") + cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "") assert cached_specs_dict is None