Integrate spiff-element-units 0.3.0 for (some) lazy call activities (#239)

This commit is contained in:
jbirddog 2023-05-04 14:15:13 -04:00 committed by GitHub
parent 60db9db296
commit b89527ab80
7 changed files with 117 additions and 42 deletions

View File

@ -3263,23 +3263,23 @@ test = ["pytest"]
[[package]]
name = "spiff-element-units"
version = "0.1.0"
version = "0.3.0"
description = ""
category = "main"
optional = false
python-versions = ">=3.7"
python-versions = ">=3.9"
files = [
{file = "spiff_element_units-0.1.0-cp39-abi3-macosx_10_7_x86_64.whl", hash = "sha256:fc34e1012a922037cf5d04c154a37119bc1ba83cc536d79fde601da42703d5f7"},
{file = "spiff_element_units-0.1.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:664197124c2a81c780d83a60750ad4411dda22a31e0db7615007a3905393fa4b"},
{file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:34409c2a1f24dfca99afafd3b1caa9e2fba66c81864954f7f9ebf8030bc632c8"},
{file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:dcc1307447f30f597d31224d855c979f5c8cca5906792cbf7d9418ee2bcac54b"},
{file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:f40788203884db9d15e1e2c6b5d7b019b38606750f3df75f66d6c14fe492b5b4"},
{file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:78afa90e8ee48dfe84b8accb3399212297cb2b05f7e5288ae57614f68a2cffd8"},
{file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:aab909b3d8f896eb2f2ed64273a7e013564a708e875883dafe76cdb34e459cb3"},
{file = "spiff_element_units-0.1.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:1a264c4ad717d83fe1d7b31d4c9a26143a3a9eff9cfdb09216f4fc1ef029e178"},
{file = "spiff_element_units-0.1.0-cp39-abi3-win32.whl", hash = "sha256:9b2e25b8b18ae006c39cd0b8bac11b08d60bf0e53b60a9cca9a3cec3750f0176"},
{file = "spiff_element_units-0.1.0-cp39-abi3-win_amd64.whl", hash = "sha256:825594fa95496db3a9a7826c9cbf95b90b6a5676838bbd9fe23c5ff32a2d2920"},
{file = "spiff_element_units-0.1.0.tar.gz", hash = "sha256:807e207562220f350cd0f0b6a46484c7b976c4effe4b194701179add7abe871a"},
{file = "spiff_element_units-0.3.0-cp39-abi3-macosx_10_7_x86_64.whl", hash = "sha256:35d2ede30e4d2fc715882c44b19e70e7289e1bb6fdff213198ba1e5d073987d1"},
{file = "spiff_element_units-0.3.0-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:d2a9dde576144ba040ca7fb11ee5487e7c7507b8ee7f1c6c12d24066b67c24c7"},
{file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:15e6d02e5fd59c99484afec41f7f96272e38b7f933e17be9a29b2e3bd43b8396"},
{file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:49e2f6472d5eb5c90978295042e2b3d000e3b88e833fd655d3fe30a1ba7d225b"},
{file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:eed527049bded8ea25ead384547e1ea35e99fe97353c78fd4ad014c8cd2b12b8"},
{file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:63a0f60ce401641dba142a95952330bb18f6a9f77f1ff99f8ef2a392e08774b4"},
{file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:43b13dcd21f722758c5cc6e16c317680dd7011e0b9aca83fd478c28b13eb25e0"},
{file = "spiff_element_units-0.3.0-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:2625b15d2c705fe07f6e472f75142e0f036d3bd567e6e5e946f75fd96dc8396a"},
{file = "spiff_element_units-0.3.0-cp39-abi3-win32.whl", hash = "sha256:fe07bbe1e6e036ffb1fd78d719f5e83c160e657cf5e6eeee5938946e9b940116"},
{file = "spiff_element_units-0.3.0-cp39-abi3-win_amd64.whl", hash = "sha256:beacf2857ba196b39a66f481e9989513e4df607e6d2157d4c1c1e463890ca806"},
{file = "spiff_element_units-0.3.0.tar.gz", hash = "sha256:5c740c70adf7e0fc39c9a3a199c4f1d5d5941d61b28c90637b64df4dc3df57dd"},
]
[[package]]
@ -3938,4 +3938,4 @@ testing = ["big-O", "flake8 (<5)", "jaraco.functools", "jaraco.itertools", "more
[metadata]
lock-version = "2.0"
python-versions = ">=3.9,<3.12"
content-hash = "53f3340f73de770b4fbebff3fcd396cdf1bc2c082b929ade350f31a9df6c3860"
content-hash = "bc4e0dabc679dd84e7803930d043a934276298715657bf85226ec9541eeebfb9"

View File

@ -86,7 +86,7 @@ prometheus-flask-exporter = "^0.22.3"
safety = "^2.3.5"
sqlalchemy = "^2.0.7"
marshmallow-sqlalchemy = "^0.29.0"
spiff-element-units = "^0.1.0"
spiff-element-units = "^0.3.0"
[tool.poetry.dev-dependencies]
pytest = "^7.1.2"

View File

@ -437,7 +437,7 @@ def _interstitial_stream(process_instance: ProcessInstanceModel) -> Generator[st
)
yield f"data: {current_app.json.dumps(api_error)} \n\n"
return
processor.bpmn_process_instance.refresh_waiting_tasks()
processor.refresh_waiting_tasks()
ready_engine_task_count = get_ready_engine_step_count(processor.bpmn_process_instance)
tasks = get_reportable_tasks()
if ready_engine_task_count == 0:

View File

@ -29,16 +29,23 @@ class ElementUnitsService:
# for now we are importing inside each of these functions, not sure the best
# way to do this in an overall feature flagged strategy but this gets things
# moving
import spiff_element_units # type: ignore
import spiff_element_units
cache_dir = cls._cache_dir()
if cache_dir is None:
# make mypy happy
return None
bpmn_spec_json = json.dumps(bpmn_spec_dict)
spiff_element_units.cache_element_units_for_workflow(cls._cache_dir(), cache_key, bpmn_spec_json)
spiff_element_units.cache_element_units_for_workflow(cache_dir, cache_key, bpmn_spec_json)
except Exception as e:
current_app.logger.exception(e)
return None
@classmethod
def workflow_from_cached_element_unit(cls, cache_key: str, element_id: str) -> Optional[BpmnSpecDict]:
def workflow_from_cached_element_unit(
cls, cache_key: str, process_id: str, element_id: str
) -> Optional[BpmnSpecDict]:
if not cls._enabled():
return None
@ -48,8 +55,15 @@ class ElementUnitsService:
# moving
import spiff_element_units
cache_dir = cls._cache_dir()
if cache_dir is None:
# make mypy happy
return None
current_app.logger.info(f"Checking element unit cache @ {cache_key} :: '{process_id}' - '{element_id}'")
bpmn_spec_json = spiff_element_units.workflow_from_cached_element_unit(
cls._cache_dir(), cache_key, element_id
cache_dir, cache_key, process_id, element_id
)
return json.loads(bpmn_spec_json) # type: ignore
except Exception as e:

View File

@ -680,6 +680,7 @@ class ProcessInstanceProcessor:
element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit(
full_process_model_hash,
bpmn_process_definition.bpmn_identifier,
bpmn_process_definition.bpmn_identifier,
)
if element_unit_process_dict is not None:
spiff_bpmn_process_dict["spec"] = element_unit_process_dict["spec"]
@ -693,6 +694,10 @@ class ProcessInstanceProcessor:
bpmn_subprocesses = BpmnProcessModel.query.filter_by(top_level_process_id=bpmn_process.id).all()
bpmn_subprocess_id_to_guid_mappings = {}
for bpmn_subprocess in bpmn_subprocesses:
subprocess_identifier = bpmn_subprocess.bpmn_process_definition.bpmn_identifier
if subprocess_identifier not in spiff_bpmn_process_dict["subprocess_specs"]:
current_app.logger.info(f"Deferring subprocess spec: '{subprocess_identifier}'")
continue
bpmn_subprocess_id_to_guid_mappings[bpmn_subprocess.id] = bpmn_subprocess.guid
single_bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess)
spiff_bpmn_process_dict["subprocesses"][bpmn_subprocess.guid] = single_bpmn_process_dict
@ -1544,6 +1549,50 @@ class ProcessInstanceProcessor:
db.session.add(message_instance)
db.session.commit()
def element_unit_specs_loader(self, process_id: str, element_id: str) -> Optional[Dict[str, Any]]:
full_process_model_hash = self.process_instance_model.bpmn_process_definition.full_process_model_hash
if full_process_model_hash is None:
return None
element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit(
full_process_model_hash,
process_id,
element_id,
)
if element_unit_process_dict is not None:
spec_dict = element_unit_process_dict["spec"]
subprocess_specs_dict = element_unit_process_dict["subprocess_specs"]
restored_specs = {k: self.wf_spec_converter.restore(v) for k, v in subprocess_specs_dict.items()}
restored_specs[spec_dict["name"]] = self.wf_spec_converter.restore(spec_dict)
return restored_specs
return None
def lazy_load_subprocess_specs(self) -> None:
tasks = self.bpmn_process_instance.get_tasks(TaskState.DEFINITE_MASK)
loaded_specs = set(self.bpmn_process_instance.subprocess_specs.keys())
for task in tasks:
if task.task_spec.spec_type != "Call Activity":
continue
spec_to_check = task.task_spec.spec
if spec_to_check not in loaded_specs:
lazy_subprocess_specs = self.element_unit_specs_loader(spec_to_check, spec_to_check)
if lazy_subprocess_specs is None:
continue
for name, spec in lazy_subprocess_specs.items():
if name not in loaded_specs:
self.bpmn_process_instance.subprocess_specs[name] = spec
loaded_specs.add(name)
def refresh_waiting_tasks(self) -> None:
self.lazy_load_subprocess_specs()
self.bpmn_process_instance.refresh_waiting_tasks()
def do_engine_steps(
self,
exit_at: None = None,
@ -1577,7 +1626,9 @@ class ProcessInstanceProcessor:
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set"
)
execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate)
execution_strategy = execution_strategy_named(
execution_strategy_name, task_model_delegate, self.lazy_load_subprocess_specs
)
execution_service = WorkflowExecutionService(
self.bpmn_process_instance,
self.process_instance_model,

View File

@ -3,7 +3,10 @@ from __future__ import annotations
import copy
import time
from abc import abstractmethod
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from uuid import UUID
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
@ -74,12 +77,16 @@ class EngineStepDelegate:
pass
SubprocessSpecLoader = Callable[[], Optional[Dict[str, Any]]]
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate):
def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader):
"""__init__."""
self.delegate = delegate
self.subprocess_spec_loader = subprocess_spec_loader
@abstractmethod
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
@ -92,7 +99,7 @@ class ExecutionStrategy:
self.delegate.save(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
return list(
tasks = list(
[
t
for t in bpmn_process_instance.get_tasks(TaskState.READY)
@ -100,6 +107,12 @@ class ExecutionStrategy:
]
)
if len(tasks) > 0:
self.subprocess_spec_loader()
tasks = [tasks[0]]
return tasks
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.
@ -263,19 +276,14 @@ class GreedyExecutionStrategy(ExecutionStrategy):
spiff.refresh_waiting_tasks is the thing that pushes some waiting tasks to READY.
"""
self.bpmn_process_instance.do_engine_steps(
exit_at=exit_at,
will_complete_task=self.delegate.will_complete_task,
did_complete_task=self.delegate.did_complete_task,
)
self.bpmn_process_instance.refresh_waiting_tasks()
ready_tasks = self.bpmn_process_instance.get_tasks(TaskState.READY)
non_human_waiting_task = next(
(p for p in ready_tasks if p.task_spec.spec_type not in ["User Task", "Manual Task"]), None
)
if non_human_waiting_task is not None:
self.run_until_user_input_required(exit_at)
engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance)
while engine_steps:
for spiff_task in engine_steps:
self.delegate.will_complete_task(spiff_task)
spiff_task.run()
self.delegate.did_complete_task(spiff_task)
self.bpmn_process_instance.refresh_waiting_tasks()
engine_steps = self.get_ready_engine_steps(self.bpmn_process_instance)
class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy):
@ -338,7 +346,9 @@ class OneAtATimeExecutionStrategy(ExecutionStrategy):
self.delegate.after_engine_steps(bpmn_process_instance)
def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> ExecutionStrategy:
def execution_strategy_named(
name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader
) -> ExecutionStrategy:
cls = {
"greedy": GreedyExecutionStrategy,
"run_until_service_task": RunUntilServiceTaskExecutionStrategy,
@ -346,7 +356,7 @@ def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> Executi
"one_at_a_time": OneAtATimeExecutionStrategy,
}[name]
return cls(delegate) # type: ignore
return cls(delegate, spec_loader) # type: ignore
ProcessInstanceCompleter = Callable[[BpmnWorkflow], None]

View File

@ -93,7 +93,7 @@ class TestElementUnitsService(BaseTest):
self,
app_disabled: Flask,
) -> None:
result = ElementUnitsService.workflow_from_cached_element_unit("", "")
result = ElementUnitsService.workflow_from_cached_element_unit("", "", "")
assert result is None
def test_can_write_to_cache(
@ -122,12 +122,12 @@ class TestElementUnitsService(BaseTest):
example_specs_dict: BpmnSpecDict,
) -> None:
ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict)
cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks")
assert cached_specs_dict == example_specs_dict
cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "no_tasks")
assert cached_specs_dict["spec"]["name"] == example_specs_dict["spec"]["name"] # type: ignore
def test_reading_element_unit_for_uncached_process_returns_none(
self,
app_enabled_tmp_cache_dir: Flask,
) -> None:
cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks")
cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "")
assert cached_specs_dict is None