From 9043a08a9ecdef6f082f6b46f567cc19cf9af3f4 Mon Sep 17 00:00:00 2001 From: jbirddog <100367399+jbirddog@users.noreply.github.com> Date: Wed, 10 Apr 2024 11:22:52 -0400 Subject: [PATCH] Drop element units as a dependecy (#1366) --- spiffworkflow-backend/poetry.lock | 24 +-- spiffworkflow-backend/pyproject.toml | 1 - .../services/element_units_service.py | 67 -------- .../services/process_instance_processor.py | 107 +------------ .../services/workflow_execution_service.py | 24 +-- .../unit/test_element_units_service.py | 144 ------------------ 6 files changed, 10 insertions(+), 357 deletions(-) delete mode 100644 spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py delete mode 100644 spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 56f9ff363..bcb81e8be 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.2 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. [[package]] name = "alembic" @@ -3008,26 +3008,6 @@ files = [ {file = "sniffio-1.3.0.tar.gz", hash = "sha256:e60305c5e5d314f5389259b7f22aaa33d8f7dee49763119234af3755c55b9101"}, ] -[[package]] -name = "spiff-element-units" -version = "0.3.1" -description = "" -optional = false -python-versions = ">=3.9" -files = [ - {file = "spiff_element_units-0.3.1-cp39-abi3-macosx_10_7_x86_64.whl", hash = "sha256:992f80f81dfbcbf1b6f0244d3de757d52a571f579a9848c57eb894e1155956ad"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-macosx_11_0_arm64.whl", hash = "sha256:beedf62f877c1944aeba3fb25bba6b0916176600733d0d6359633d71dab3f2eb"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:742f36f0fcff426883de7c05beb799ed29101d1e369cfe4fdad329c109d07649"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:8cb4615924ec87714a651d662d6292edb255e6b0918f58664addb281a3c80465"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:b59b6dbe63f4d47eee9aa0d6300785732fb14d5ff875b8efb24ead76e2a6d123"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:ab6a7d35dd9004cd6394e5fc844bac434072fdc5dea283b8acc405afe9be38e7"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2ff1e5db36827971309c09fc46287504eec40b7d2184c02ca40751fc49568ab1"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:0364b562ae5b5b6204156d58687673272f34db201bef5d62737e6dcbbbf98c82"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-win32.whl", hash = "sha256:e6493786c95e853949620b1cf8f7826a7d3235c0cc0bb6bd299e93eeae275607"}, - {file = "spiff_element_units-0.3.1-cp39-abi3-win_amd64.whl", hash = "sha256:2cb3a4fe9e1629d17ce3374dcabcedd881537fb7013660274e90794b4ffbe1ef"}, - {file = "spiff_element_units-0.3.1.tar.gz", hash = "sha256:aced53289dbf8918b2e3acc679c42da05210da4f38c47d46bcad2b9dcb223363"}, -] - [[package]] name = "SpiffWorkflow" version = "3.0.0rc2" @@ -3574,4 +3554,4 @@ tests-strict = ["pytest (==4.6.0)", "pytest (==4.6.0)", "pytest (==6.2.5)", "pyt [metadata] lock-version = "2.0" python-versions = ">=3.10,<3.13" -content-hash = "bc0439c301566977dcf7b8cf896880e4b9b9482699eabb6ef27fac7390997c91" +content-hash = "e9b302855b0c6a7745e69132c1be886a5d1f76e8bbf4dc32efc4a4b4603e331f" diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml index bbc160bef..63a6b1de1 100644 --- a/spiffworkflow-backend/pyproject.toml +++ b/spiffworkflow-backend/pyproject.toml @@ -71,7 +71,6 @@ prometheus-flask-exporter = "^0.23.0" sqlalchemy = "^2.0.7" marshmallow-sqlalchemy = "^1.0.0" -spiff-element-units = "^0.3.1" # mysqlclient lib is deemed better than the mysql-connector-python lib by sqlalchemy # https://docs.sqlalchemy.org/en/20/dialects/mysql.html#module-sqlalchemy.dialects.mysql.mysqlconnector diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py deleted file mode 100644 index d51df7b38..000000000 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/element_units_service.py +++ /dev/null @@ -1,67 +0,0 @@ -import json -from typing import Any - -from flask import current_app - -from spiffworkflow_backend.services.feature_flag_service import FeatureFlagService - -BpmnSpecDict = dict[str, Any] - - -class ElementUnitsService: - """Feature gated glue between the backend and spiff-element-units.""" - - @classmethod - def _cache_dir(cls) -> str | None: - return current_app.config["SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR"] # type: ignore - - @classmethod - def _enabled(cls) -> bool: - enabled = FeatureFlagService.feature_enabled("element_units", False) - return enabled and cls._cache_dir() is not None - - @classmethod - def cache_element_units_for_workflow(cls, cache_key: str, bpmn_spec_dict: BpmnSpecDict) -> None: - if not cls._enabled(): - return None - - try: - # for now we are importing inside each of these functions, not sure the best - # way to do this in an overall feature flagged strategy but this gets things - # moving - import spiff_element_units - - cache_dir = cls._cache_dir() - if cache_dir is None: - # make mypy happy - return None - - bpmn_spec_json = json.dumps(bpmn_spec_dict) - spiff_element_units.cache_element_units_for_workflow(cache_dir, cache_key, bpmn_spec_json) - except Exception as e: - current_app.logger.exception(e) - return None - - @classmethod - def workflow_from_cached_element_unit(cls, cache_key: str, process_id: str, element_id: str) -> BpmnSpecDict | None: - if not cls._enabled(): - return None - - try: - # for now we are importing inside each of these functions, not sure the best - # way to do this in an overall feature flagged strategy but this gets things - # moving - import spiff_element_units - - cache_dir = cls._cache_dir() - if cache_dir is None: - # make mypy happy - return None - - current_app.logger.debug(f"Checking element unit cache @ {cache_key} :: '{process_id}' - '{element_id}'") - - bpmn_spec_json = spiff_element_units.workflow_from_cached_element_unit(cache_dir, cache_key, process_id, element_id) - return json.loads(bpmn_spec_json) # type: ignore - except Exception as e: - current_app.logger.exception(e) - return None diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index e5bdaba09..ea3f59b08 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -90,7 +90,6 @@ from spiffworkflow_backend.models.task_definition import TaskDefinitionModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.services.custom_parser import MyCustomParser -from spiffworkflow_backend.services.element_units_service import ElementUnitsService from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.jinja_service import JinjaHelpers from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService @@ -730,42 +729,6 @@ class ProcessInstanceProcessor: bpmn_definition_to_task_definitions_mappings, ) - # - # see if we have any cached element units and if so step on the spec and subprocess_specs. - # in the early stages of development this will return the full workflow when the feature - # flag is set to on. as time goes we will need to think about how this plays in with the - # bpmn definition tables more. - # - - subprocess_specs_for_ready_tasks = set() - element_unit_process_dict = None - full_process_model_hash = bpmn_process_definition.full_process_model_hash - - if full_process_model_hash is not None: - process_id = bpmn_process_definition.bpmn_identifier - element_id = bpmn_process_definition.bpmn_identifier - - subprocess_specs_for_ready_tasks = { - r.bpmn_identifier - for r in db.session.query(BpmnProcessDefinitionModel.bpmn_identifier) # type: ignore - .join(TaskDefinitionModel) - .join(TaskModel) - .filter(TaskModel.process_instance_id == process_instance_model.id) - .filter(TaskModel.state == "READY") - .all() - } - - element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit( - full_process_model_hash, process_id, element_id - ) - - if element_unit_process_dict is not None: - spiff_bpmn_process_dict["spec"] = element_unit_process_dict["spec"] - keys = list(spiff_bpmn_process_dict["subprocess_specs"].keys()) - for k in keys: - if k not in subprocess_specs_for_ready_tasks and k not in element_unit_process_dict["subprocess_specs"]: - spiff_bpmn_process_dict["subprocess_specs"].pop(k) - bpmn_process = process_instance_model.bpmn_process if bpmn_process is not None: single_bpmn_process_dict = cls._get_bpmn_process_dict( @@ -1076,26 +1039,6 @@ class ProcessInstanceProcessor: ) process_instance_model.bpmn_process_definition = bpmn_process_definition_parent - # - # builds and caches the element units for the parent bpmn process defintion. these - # element units can then be queried using the same hash for later execution. - # - # TODO: this seems to be run each time a process instance is started, so element - # units will only be queried after a save/resume point. the hash used as the key - # can be anything, so possibly some hash of all files required to form the process - # definition and their hashes could be used? Not sure how that plays in with the - # bpmn_process_defintion hash though. - # - - # TODO: first time through for an instance the bpmn_spec_dict seems to get mutated, - # so for now we don't seed the cache until the second instance. not immediately a - # problem and can be part of the larger discussion mentioned in the TODO above. - - full_process_model_hash = bpmn_process_definition_parent.full_process_model_hash - - if full_process_model_hash is not None and "task_specs" in bpmn_spec_dict["spec"]: - ElementUnitsService.cache_element_units_for_workflow(full_process_model_hash, bpmn_spec_dict) - def save(self) -> None: """Saves the current state of this processor to the database.""" self.process_instance_model.spiff_serializer_version = SPIFFWORKFLOW_BACKEND_SERIALIZER_VERSION @@ -1235,9 +1178,7 @@ class ProcessInstanceProcessor: process_instance=self.process_instance_model, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) - execution_strategy = SkipOneExecutionStrategy( - task_model_delegate, self.lazy_load_subprocess_specs, {"spiff_task": spiff_task} - ) + execution_strategy = SkipOneExecutionStrategy(task_model_delegate, {"spiff_task": spiff_task}) self.do_engine_steps(save=True, execution_strategy=execution_strategy) spiff_tasks = self.bpmn_process_instance.get_tasks() @@ -1436,49 +1377,7 @@ class ProcessInstanceProcessor: # current_app.logger.debug(f"the_status: {the_status} for instance {self.process_instance_model.id}") return the_status - def element_unit_specs_loader(self, process_id: str, element_id: str) -> dict[str, Any] | None: - full_process_model_hash = self.process_instance_model.bpmn_process_definition.full_process_model_hash - if full_process_model_hash is None: - return None - - element_unit_process_dict = ElementUnitsService.workflow_from_cached_element_unit( - full_process_model_hash, - process_id, - element_id, - ) - - if element_unit_process_dict is not None: - spec_dict = element_unit_process_dict["spec"] - subprocess_specs_dict = element_unit_process_dict["subprocess_specs"] - - restored_specs = {k: self.wf_spec_converter.restore(v) for k, v in subprocess_specs_dict.items()} - restored_specs[spec_dict["name"]] = self.wf_spec_converter.restore(spec_dict) - - return restored_specs - - return None - - def lazy_load_subprocess_specs(self) -> None: - tasks = self.bpmn_process_instance.get_tasks(state=TaskState.DEFINITE_MASK) - loaded_specs = set(self.bpmn_process_instance.subprocess_specs.keys()) - for task in tasks: - if task.task_spec.__class__.__name__ != "CallActivity": - continue - spec_to_check = task.task_spec.spec - - if spec_to_check not in loaded_specs: - lazy_subprocess_specs = self.element_unit_specs_loader(spec_to_check, spec_to_check) - if lazy_subprocess_specs is None: - continue - - for name, spec in lazy_subprocess_specs.items(): - if name not in loaded_specs: - self.bpmn_process_instance.subprocess_specs[name] = spec - self.refresh_waiting_tasks() - loaded_specs.add(name) - def refresh_waiting_tasks(self) -> None: - self.lazy_load_subprocess_specs() self.bpmn_process_instance.refresh_waiting_tasks() def do_engine_steps( @@ -1530,9 +1429,7 @@ class ProcessInstanceProcessor: raise ExecutionStrategyNotConfiguredError( "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set" ) - execution_strategy = execution_strategy_named( - execution_strategy_name, task_model_delegate, self.lazy_load_subprocess_specs - ) + execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate) execution_service = WorkflowExecutionService( self.bpmn_process_instance, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 2bcfabd8c..dd7c795c7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -90,15 +90,11 @@ class EngineStepDelegate: pass -SubprocessSpecLoader = Callable[[], dict[str, Any] | None] - - class ExecutionStrategy: """Interface of sorts for a concrete execution strategy.""" - def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader, options: dict | None = None): + def __init__(self, delegate: EngineStepDelegate, options: dict | None = None): self.delegate = delegate - self.subprocess_spec_loader = subprocess_spec_loader self.options = options def should_break_before(self, tasks: list[SpiffTask], process_instance_model: ProcessInstanceModel) -> bool: @@ -185,15 +181,7 @@ class ExecutionStrategy: self.delegate.add_object_to_db_session(bpmn_process_instance) def get_ready_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> list[SpiffTask]: - tasks = [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual] - - if len(tasks) > 0: - self.subprocess_spec_loader() - - # TODO: verify the other execution strategies work still and delete to make this work like the name - # tasks = [tasks[0]] - - return tasks + return [t for t in bpmn_process_instance.get_tasks(state=TaskState.READY) if not t.task_spec.manual] class TaskModelSavingDelegate(EngineStepDelegate): @@ -310,8 +298,8 @@ class QueueInstructionsForEndUserExecutionStrategy(ExecutionStrategy): The queue can be used to display the instructions to user later. """ - def __init__(self, delegate: EngineStepDelegate, subprocess_spec_loader: SubprocessSpecLoader, options: dict | None = None): - super().__init__(delegate, subprocess_spec_loader, options) + def __init__(self, delegate: EngineStepDelegate, options: dict | None = None): + super().__init__(delegate, options) self.tasks_that_have_been_seen: set[str] = set() def should_do_before(self, bpmn_process_instance: BpmnWorkflow, process_instance_model: ProcessInstanceModel) -> None: @@ -388,7 +376,7 @@ class SkipOneExecutionStrategy(ExecutionStrategy): return TaskRunnability.has_ready_tasks if len(engine_steps) > 1 else TaskRunnability.unknown_if_ready_tasks -def execution_strategy_named(name: str, delegate: EngineStepDelegate, spec_loader: SubprocessSpecLoader) -> ExecutionStrategy: +def execution_strategy_named(name: str, delegate: EngineStepDelegate) -> ExecutionStrategy: cls = { "greedy": GreedyExecutionStrategy, "queue_instructions_for_end_user": QueueInstructionsForEndUserExecutionStrategy, @@ -397,7 +385,7 @@ def execution_strategy_named(name: str, delegate: EngineStepDelegate, spec_loade "skip_one": SkipOneExecutionStrategy, }[name] - return cls(delegate, spec_loader) + return cls(delegate) ProcessInstanceCompleter = Callable[[BpmnWorkflow], None] diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py deleted file mode 100644 index 2152a94cd..000000000 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_element_units_service.py +++ /dev/null @@ -1,144 +0,0 @@ -import json -import os -import tempfile -from collections.abc import Generator - -import pytest -from flask.app import Flask -from spiffworkflow_backend.models.db import db -from spiffworkflow_backend.models.feature_flag import FeatureFlagModel -from spiffworkflow_backend.services.element_units_service import BpmnSpecDict -from spiffworkflow_backend.services.element_units_service import ElementUnitsService -from spiffworkflow_backend.services.feature_flag_service import FeatureFlagService - -from tests.spiffworkflow_backend.helpers.base_test import BaseTest - -# -# we don't want to fully flex every aspect of the spiff-element-units -# library here, mainly just checking that our interaction with it is -# as expected. -# - - -@pytest.fixture() -def feature_enabled(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[None, None, None]: - db.session.query(FeatureFlagModel).delete() - db.session.commit() - FeatureFlagService.set_feature_flags({"element_units": True}, {}) - yield - - -@pytest.fixture() -def feature_disabled(app: Flask, with_db_and_bpmn_file_cleanup: None) -> Generator[None, None, None]: - db.session.query(FeatureFlagModel).delete() - db.session.commit() - FeatureFlagService.set_feature_flags({"element_units": False}, {}) - yield - - -@pytest.fixture() -def app_no_cache_dir(app: Flask) -> Generator[Flask, None, None]: - with BaseTest().app_config_mock(app, "SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR", None): - yield app - - -@pytest.fixture() -def app_some_cache_dir(app: Flask) -> Generator[Flask, None, None]: - with BaseTest().app_config_mock(app, "SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR", "some_cache_dir"): - yield app - - -@pytest.fixture() -def app_tmp_cache_dir(app: Flask) -> Generator[Flask, None, None]: - with tempfile.TemporaryDirectory() as tmpdirname: - with BaseTest().app_config_mock(app, "SPIFFWORKFLOW_BACKEND_ELEMENT_UNITS_CACHE_DIR", tmpdirname): - yield app - - -@pytest.fixture() -def example_specs_dict(app: Flask) -> Generator[BpmnSpecDict, None, None]: - path = os.path.join(app.instance_path, "..", "..", "tests", "data", "specs-json", "no-tasks.json") - with open(path) as f: - yield json.loads(f.read()) - - -class TestElementUnitsService(BaseTest): - """Tests the ElementUnitsService.""" - - def test_cache_dir_env_is_respected( - self, - app_some_cache_dir: Flask, - ) -> None: - assert ElementUnitsService._cache_dir() == "some_cache_dir" - - def test_feature_disabled_if_feature_flag_is_false( - self, - feature_disabled: None, - ) -> None: - assert not ElementUnitsService._enabled() - - def test_feature_enabled_if_env_is_true( - self, - feature_enabled: None, - ) -> None: - assert ElementUnitsService._enabled() - - def test_is_disabled_when_no_cache_dir( - self, - app_no_cache_dir: Flask, - ) -> None: - assert not ElementUnitsService._enabled() - - def test_ok_to_cache_when_disabled( - self, - feature_disabled: None, - ) -> None: - result = ElementUnitsService.cache_element_units_for_workflow("", {}) - assert result is None - - def test_ok_to_read_workflow_from_cached_element_unit_when_disabled( - self, - feature_disabled: None, - ) -> None: - result = ElementUnitsService.workflow_from_cached_element_unit("", "", "") - assert result is None - - def test_can_write_to_cache( - self, - app_tmp_cache_dir: Flask, - feature_enabled: None, - example_specs_dict: BpmnSpecDict, - ) -> None: - result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict) - assert result is None - - def test_can_write_to_cache_multiple_times( - self, - app_tmp_cache_dir: Flask, - feature_enabled: None, - example_specs_dict: BpmnSpecDict, - ) -> None: - result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict) - assert result is None - result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict) - assert result is None - result = ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict) - assert result is None - - def test_can_read_element_unit_for_process_from_cache( - self, - app_tmp_cache_dir: Flask, - feature_enabled: None, - example_specs_dict: BpmnSpecDict, - ) -> None: - ElementUnitsService.cache_element_units_for_workflow("testing", example_specs_dict) - cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "no_tasks") - assert cached_specs_dict["spec"]["name"] == example_specs_dict["spec"]["name"] # type: ignore - - def test_reading_element_unit_for_uncached_process_returns_none( - self, - app_tmp_cache_dir: Flask, - feature_enabled: None, - ) -> None: - cached_specs_dict = ElementUnitsService.workflow_from_cached_element_unit("testing", "no_tasks", "") - assert cached_specs_dict is None