Drop element units as a dependecy (#1366)

This commit is contained in:
jbirddog 2024-04-10 11:22:52 -04:00 committed by GitHub
parent 2ae1ab45c8
commit 9043a08a9e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 10 additions and 357 deletions

View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. # This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand.
[[package]] [[package]]
name = "alembic" name = "alembic"
@ -3008,26 +3008,6 @@ files = [
{file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"}, {file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"},
] ]
[[package]]
name = "spiff-element-units"
version = "0.3.1"
description = ""
optional = false
python-versions = ">=3.9"
files = [
{file = "spiff_element_units-0.3.1-cp39-abi3-macosx_10_7_x86_64.whl", hash = "sha256:992f80f81dfbcbf1b6f0244d3de757d52a571f579a9848c57eb894e1155956ad"},
{file = "spiff_element_units-0.3.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:beedf62f877c1944aeba3fb25bba6b0916176600733d0d6359633d71dab3f2eb"},
{file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:742f36f0fcff426883de7c05beb799ed29101d1e369cfe4fdad329c109d07649"},
{file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8cb4615924ec87714a651d662d6292edb255e6b0918f58664addb281a3c80465"},
{file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:b59b6dbe63f4d47eee9aa0d6300785732fb14d5ff875b8efb24ead76e2a6d123"},
{file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:ab6a7d35dd9004cd6394e5fc844bac434072fdc5dea283b8acc405afe9be38e7"},
{file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ff1e5db36827971309c09fc46287504eec40b7d2184c02ca40751fc49568ab1"},
{file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:0364b562ae5b5b6204156d58687673272f34db201bef5d62737e6dcbbbf98c82"},
{file = "spiff_element_units-0.3.1-cp39-abi3-win32.whl", hash = "sha256:e6493786c95e853949620b1cf8f7826a7d3235c0cc0bb6bd299e93eeae275607"},
{file = "spiff_element_units-0.3.1-cp39-abi3-win_amd64.whl", hash = "sha256:2cb3a4fe9e1629d17ce3374dcabcedd881537fb7013660274e90794b4ffbe1ef"},
{file = "spiff_element_units-0.3.1.tar.gz", hash = "sha256:aced53289dbf8918b2e3acc679c42da05210da4f38c47d46bcad2b9dcb223363"},
]
[[package]] [[package]]
name = "SpiffWorkflow" name = "SpiffWorkflow"
version = "3.0.0rc2" version = "3.0.0rc2"
@ -3574,4 +3554,4 @@ tests-strict = ["pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pyt
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = ">=3.10,<3.13" python-versions = ">=3.10,<3.13"
content-hash = "bc0439c301566977dcf7b8cf896880e4b9b9482699eabb6ef27fac7390997c91" content-hash = "e9b302855b0c6a7745e69132c1be886a5d1f76e8bbf4dc32efc4a4b4603e331f"

View File

@ -71,7 +71,6 @@ prometheus-flask-exporter = "^0.23.0"
sqlalchemy = "^2.0.7" sqlalchemy = "^2.0.7"
marshmallow-sqlalchemy = "^1.0.0" marshmallow-sqlalchemy = "^1.0.0"
spiff-element-units = "^0.3.1"
# mysqlclient lib is deemed better than the mysql-connector-python lib by sqlalchemy # mysqlclient lib is deemed better than the mysql-connector-python lib by sqlalchemy
# https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqlconnector # https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqlconnector

View File

@ -1,67 +0,0 @@
import json
from typing import Any
from flask import current_app
from spiffworkflow_backend.services.feature_flag_service import FeatureFlagService
BpmnSpecDict = dict[str, Any]
class ElementUnitsService:
"""Feature gated glue between the backend and spiff-element-units."""
@classmethod
def _cache_dir(cls) -> str | None:
return current_app.config["SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR"] # type: ignore
@classmethod
def _enabled(cls) -> bool:
enabled = FeatureFlagService.feature_enabled("element_units", False)
return enabled and cls._cache_dir() is not None
@classmethod
def cache_element_units_for_workflow(cls, cache_key: str, bpmn_spec_dict: BpmnSpecDict) -> None:
if not cls._enabled():
return None
try:
# for now we are importing inside each of these functions, not sure the best
# way to do this in an overall feature flagged strategy but this gets things
# moving
import spiff_element_units
cache_dir = cls._cache_dir()
if cache_dir is None:
# make mypy happy
return None
bpmn_spec_json = json.dumps(bpmn_spec_dict)
spiff_element_units.cache_element_units_for_workflow(cache_dir, cache_key, bpmn_spec_json)
except Exception as e:
current_app.logger.exception(e)
return None
@classmethod
def workflow_from_cached_element_unit(cls, cache_key: str, process_id: str, element_id: str) -> BpmnSpecDict | None:
if not cls._enabled():
return None
try:
# for now we are importing inside each of these functions, not sure the best
# way to do this in an overall feature flagged strategy but this gets things
# moving
import spiff_element_units
cache_dir = cls._cache_dir()
if cache_dir is None:
# make mypy happy
return None
current_app.logger.debug(f"Checking element unit cache @ {cache_key} :: '{process_id}' - '{element_id}'")
bpmn_spec_json = spiff_element_units.workflow_from_cached_element_unit(cache_dir, cache_key, process_id, element_id)
return json.loads(bpmn_spec_json) # type: ignore
except Exception as e:
current_app.logger.exception(e)
return None

View File

@ -90,7 +90,6 @@ from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.custom_parser import MyCustomParser from spiffworkflow_backend.services.custom_parser import MyCustomParser
from spiffworkflow_backend.services.element_units_service import ElementUnitsService
from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.file_system_service import FileSystemService
from spiffworkflow_backend.services.jinja_service import JinjaHelpers from spiffworkflow_backend.services.jinja_service import JinjaHelpers
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
@ -730,42 +729,6 @@ class ProcessInstanceProcessor:
bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings,
) )
#
# see if we have any cached element units and if so step on the spec and subprocess_specs.
# in the early stages of development this will return the full workflow when the feature
# flag is set to on. as time goes we will need to think about how this plays in with the
# bpmn definition tables more.
#
subprocess_specs_for_ready_tasks = set()
element_unit_process_dict = None
full_process_model_hash = bpmn_process_definition.full_process_model_hash
if full_process_model_hash is not None:
process_id = bpmn_process_definition.bpmn_identifier
element_id = bpmn_process_definition.bpmn_identifier
subprocess_specs_for_ready_tasks = {
r.bpmn_identifier
for r in db.session.query(BpmnProcessDefinitionModel.bpmn_identifier) # type: ignore
.join(TaskDefinitionModel)
.join(TaskModel)
.filter(TaskModel.process_instance_id == process_instance_model.id)
.filter(TaskModel.state == "READY")
.all()
}
element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit(
full_process_model_hash, process_id, element_id
)
if element_unit_process_dict is not None:
spiff_bpmn_process_dict["spec"] = element_unit_process_dict["spec"]
keys = list(spiff_bpmn_process_dict["subprocess_specs"].keys())
for k in keys:
if k not in subprocess_specs_for_ready_tasks and k not in element_unit_process_dict["subprocess_specs"]:
spiff_bpmn_process_dict["subprocess_specs"].pop(k)
bpmn_process = process_instance_model.bpmn_process bpmn_process = process_instance_model.bpmn_process
if bpmn_process is not None: if bpmn_process is not None:
single_bpmn_process_dict = cls._get_bpmn_process_dict( single_bpmn_process_dict = cls._get_bpmn_process_dict(
@ -1076,26 +1039,6 @@ class ProcessInstanceProcessor:
) )
process_instance_model.bpmn_process_definition = bpmn_process_definition_parent process_instance_model.bpmn_process_definition = bpmn_process_definition_parent
#
# builds and caches the element units for the parent bpmn process defintion. these
# element units can then be queried using the same hash for later execution.
#
# TODO: this seems to be run each time a process instance is started, so element
# units will only be queried after a save/resume point. the hash used as the key
# can be anything, so possibly some hash of all files required to form the process
# definition and their hashes could be used? Not sure how that plays in with the
# bpmn_process_defintion hash though.
#
# TODO: first time through for an instance the bpmn_spec_dict seems to get mutated,
# so for now we don't seed the cache until the second instance. not immediately a
# problem and can be part of the larger discussion mentioned in the TODO above.
full_process_model_hash = bpmn_process_definition_parent.full_process_model_hash
if full_process_model_hash is not None and "task_specs" in bpmn_spec_dict["spec"]:
ElementUnitsService.cache_element_units_for_workflow(full_process_model_hash, bpmn_spec_dict)
def save(self) -> None: def save(self) -> None:
"""Saves the current state of this processor to the database.""" """Saves the current state of this processor to the database."""
self.process_instance_model.spiff_serializer_version = SPIFFWORKFLOW_BACKEND_SERIALIZER_VERSION self.process_instance_model.spiff_serializer_version = SPIFFWORKFLOW_BACKEND_SERIALIZER_VERSION
@ -1235,9 +1178,7 @@ class ProcessInstanceProcessor:
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
execution_strategy = SkipOneExecutionStrategy( execution_strategy = SkipOneExecutionStrategy(task_model_delegate, {"spiff_task": spiff_task})
task_model_delegate, self.lazy_load_subprocess_specs, {"spiff_task": spiff_task}
)
self.do_engine_steps(save=True, execution_strategy=execution_strategy) self.do_engine_steps(save=True, execution_strategy=execution_strategy)
spiff_tasks = self.bpmn_process_instance.get_tasks() spiff_tasks = self.bpmn_process_instance.get_tasks()
@ -1436,49 +1377,7 @@ class ProcessInstanceProcessor:
# current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}") # current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}")
return the_status return the_status
def element_unit_specs_loader(self, process_id: str, element_id: str) -> dict[str, Any] | None:
full_process_model_hash = self.process_instance_model.bpmn_process_definition.full_process_model_hash
if full_process_model_hash is None:
return None
element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit(
full_process_model_hash,
process_id,
element_id,
)
if element_unit_process_dict is not None:
spec_dict = element_unit_process_dict["spec"]
subprocess_specs_dict = element_unit_process_dict["subprocess_specs"]
restored_specs = {k: self.wf_spec_converter.restore(v) for k, v in subprocess_specs_dict.items()}
restored_specs[spec_dict["name"]] = self.wf_spec_converter.restore(spec_dict)
return restored_specs
return None
def lazy_load_subprocess_specs(self) -> None:
tasks = self.bpmn_process_instance.get_tasks(state=TaskState.DEFINITE_MASK)
loaded_specs = set(self.bpmn_process_instance.subprocess_specs.keys())
for task in tasks:
if task.task_spec.__class__.__name__ != "CallActivity":
continue
spec_to_check = task.task_spec.spec
if spec_to_check not in loaded_specs:
lazy_subprocess_specs = self.element_unit_specs_loader(spec_to_check, spec_to_check)
if lazy_subprocess_specs is None:
continue
for name, spec in lazy_subprocess_specs.items():
if name not in loaded_specs:
self.bpmn_process_instance.subprocess_specs[name] = spec
self.refresh_waiting_tasks()
loaded_specs.add(name)
def refresh_waiting_tasks(self) -> None: def refresh_waiting_tasks(self) -> None:
self.lazy_load_subprocess_specs()
self.bpmn_process_instance.refresh_waiting_tasks() self.bpmn_process_instance.refresh_waiting_tasks()
def do_engine_steps( def do_engine_steps(
@ -1530,9 +1429,7 @@ class ProcessInstanceProcessor:
raise ExecutionStrategyNotConfiguredError( raise ExecutionStrategyNotConfiguredError(
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set" "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set"
) )
execution_strategy = execution_strategy_named( execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate)
execution_strategy_name, task_model_delegate, self.lazy_load_subprocess_specs
)
execution_service = WorkflowExecutionService( execution_service = WorkflowExecutionService(
self.bpmn_process_instance, self.bpmn_process_instance,

View File

@ -90,15 +90,11 @@ class EngineStepDelegate:
pass pass
SubprocessSpecLoader = Callable[[], dict[str, Any] | None]
class ExecutionStrategy: class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy.""" """Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader, options: dict | None = None): def __init__(self, delegate: EngineStepDelegate, options: dict | None = None):
self.delegate = delegate self.delegate = delegate
self.subprocess_spec_loader = subprocess_spec_loader
self.options = options self.options = options
def should_break_before(self, tasks: list[SpiffTask], process_instance_model: ProcessInstanceModel) -> bool: def should_break_before(self, tasks: list[SpiffTask], process_instance_model: ProcessInstanceModel) -> bool:
@ -185,15 +181,7 @@ class ExecutionStrategy:
self.delegate.add_object_to_db_session(bpmn_process_instance) self.delegate.add_object_to_db_session(bpmn_process_instance)
def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]:
tasks = [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual] return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual]
if len(tasks) > 0:
self.subprocess_spec_loader()
# TODO: verify the other execution strategies work still and delete to make this work like the name
# tasks = [tasks[0]]
return tasks
class TaskModelSavingDelegate(EngineStepDelegate): class TaskModelSavingDelegate(EngineStepDelegate):
@ -310,8 +298,8 @@ class QueueInstructionsForEndUserExecutionStrategy(ExecutionStrategy):
The queue can be used to display the instructions to user later. The queue can be used to display the instructions to user later.
""" """
def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader, options: dict | None = None): def __init__(self, delegate: EngineStepDelegate, options: dict | None = None):
super().__init__(delegate, subprocess_spec_loader, options) super().__init__(delegate, options)
self.tasks_that_have_been_seen: set[str] = set() self.tasks_that_have_been_seen: set[str] = set()
def should_do_before(self, bpmn_process_instance: BpmnWorkflow, process_instance_model: ProcessInstanceModel) -> None: def should_do_before(self, bpmn_process_instance: BpmnWorkflow, process_instance_model: ProcessInstanceModel) -> None:
@ -388,7 +376,7 @@ class SkipOneExecutionStrategy(ExecutionStrategy):
return TaskRunnability.has_ready_tasks if len(engine_steps) > 1 else TaskRunnability.unknown_if_ready_tasks return TaskRunnability.has_ready_tasks if len(engine_steps) > 1 else TaskRunnability.unknown_if_ready_tasks
def execution_strategy_named(name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader) -> ExecutionStrategy: def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> ExecutionStrategy:
cls = { cls = {
"greedy": GreedyExecutionStrategy, "greedy": GreedyExecutionStrategy,
"queue_instructions_for_end_user": QueueInstructionsForEndUserExecutionStrategy, "queue_instructions_for_end_user": QueueInstructionsForEndUserExecutionStrategy,
@ -397,7 +385,7 @@ def execution_strategy_named(name: str, delegate: EngineStepDelegate, spec_loade
"skip_one": SkipOneExecutionStrategy, "skip_one": SkipOneExecutionStrategy,
}[name] }[name]
return cls(delegate, spec_loader) return cls(delegate)
ProcessInstanceCompleter = Callable[[BpmnWorkflow], None] ProcessInstanceCompleter = Callable[[BpmnWorkflow], None]

View File

@ -1,144 +0,0 @@
import json
import os
import tempfile
from collections.abc import Generator
import pytest
from flask.app import Flask
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.feature_flag import FeatureFlagModel
from spiffworkflow_backend.services.element_units_service import BpmnSpecDict
from spiffworkflow_backend.services.element_units_service import ElementUnitsService
from spiffworkflow_backend.services.feature_flag_service import FeatureFlagService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
#
# we don't want to fully flex every aspect of the spiff-element-units
# library here, mainly just checking that our interaction with it is
# as expected.
#
@pytest.fixture()
def feature_enabled(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[None, None, None]:
db.session.query(FeatureFlagModel).delete()
db.session.commit()
FeatureFlagService.set_feature_flags({"element_units": True}, {})
yield
@pytest.fixture()
def feature_disabled(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[None, None, None]:
db.session.query(FeatureFlagModel).delete()
db.session.commit()
FeatureFlagService.set_feature_flags({"element_units": False}, {})
yield
@pytest.fixture()
def app_no_cache_dir(app: Flask) -> Generator[Flask, None, None]:
with BaseTest().app_config_mock(app, "SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR", None):
yield app
@pytest.fixture()
def app_some_cache_dir(app: Flask) -> Generator[Flask, None, None]:
with BaseTest().app_config_mock(app, "SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR", "some_cache_dir"):
yield app
@pytest.fixture()
def app_tmp_cache_dir(app: Flask) -> Generator[Flask, None, None]:
with tempfile.TemporaryDirectory() as tmpdirname:
with BaseTest().app_config_mock(app, "SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR", tmpdirname):
yield app
@pytest.fixture()
def example_specs_dict(app: Flask) -> Generator[BpmnSpecDict, None, None]:
path = os.path.join(app.instance_path, "..", "..", "tests", "data", "specs-json", "no-tasks.json")
with open(path) as f:
yield json.loads(f.read())
class TestElementUnitsService(BaseTest):
"""Tests the ElementUnitsService."""
def test_cache_dir_env_is_respected(
self,
app_some_cache_dir: Flask,
) -> None:
assert ElementUnitsService._cache_dir() == "some_cache_dir"
def test_feature_disabled_if_feature_flag_is_false(
self,
feature_disabled: None,
) -> None:
assert not ElementUnitsService._enabled()
def test_feature_enabled_if_env_is_true(
self,
feature_enabled: None,
) -> None:
assert ElementUnitsService._enabled()
def test_is_disabled_when_no_cache_dir(
self,
app_no_cache_dir: Flask,
) -> None:
assert not ElementUnitsService._enabled()
def test_ok_to_cache_when_disabled(
self,
feature_disabled: None,
) -> None:
result = ElementUnitsService.cache_element_units_for_workflow("", {})
assert result is None
def test_ok_to_read_workflow_from_cached_element_unit_when_disabled(
self,
feature_disabled: None,
) -> None:
result = ElementUnitsService.workflow_from_cached_element_unit("", "", "")
assert result is None
def test_can_write_to_cache(
self,
app_tmp_cache_dir: Flask,
feature_enabled: None,
example_specs_dict: BpmnSpecDict,
) -> None:
result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict)
assert result is None
def test_can_write_to_cache_multiple_times(
self,
app_tmp_cache_dir: Flask,
feature_enabled: None,
example_specs_dict: BpmnSpecDict,
) -> None:
result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict)
assert result is None
result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict)
assert result is None
result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict)
assert result is None
def test_can_read_element_unit_for_process_from_cache(
self,
app_tmp_cache_dir: Flask,
feature_enabled: None,
example_specs_dict: BpmnSpecDict,
) -> None:
ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict)
cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "no_tasks")
assert cached_specs_dict["spec"]["name"] == example_specs_dict["spec"]["name"] # type: ignore
def test_reading_element_unit_for_uncached_process_returns_none(
self,
app_tmp_cache_dir: Flask,
feature_enabled: None,
) -> None:
cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "")
assert cached_specs_dict is None