diff --git a/.flake8 b/.flake8 index fe5c9a94a..9c54dc0e7 100644 --- a/.flake8 +++ b/.flake8 @@ -17,10 +17,10 @@ per-file-ignores = # THEN, test_hey.py will NOT be excluding D103 # asserts are ok in tests - spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103 + spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103,D107 # prefer naming functions descriptively rather than forcing comments - spiffworkflow-backend/src/*:D100,D101,D102,D103 + spiffworkflow-backend/src/*:D100,D101,D102,D103,D107 spiffworkflow-backend/bin/keycloak_test_server.py:B950,D spiffworkflow-backend/conftest.py:S105 diff --git a/spiffworkflow-backend/.flake8 b/spiffworkflow-backend/.flake8 index 0fceed158..2e6554e58 100644 --- a/spiffworkflow-backend/.flake8 +++ b/spiffworkflow-backend/.flake8 @@ -17,10 +17,10 @@ per-file-ignores = # THEN, test_hey.py will NOT be excluding D103 # asserts are ok in tests - tests/*:S101,D100,D101,D102,D103 + tests/*:S101,D100,D101,D102,D103,D107 # prefer naming functions descriptively rather than forcing comments - src/*:D100,D101,D102,D103 + src/*:D100,D101,D102,D103,D107 bin/keycloak_test_server.py:B950,D conftest.py:S105 diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 30f8a481c..5242c066d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -50,7 +50,6 @@ from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState -from SpiffWorkflow.task import TaskStateNames from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from sqlalchemy import text @@ -93,6 +92,7 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate from spiffworkflow_backend.services.spec_file_service import SpecFileService +from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.workflow_execution_service import ( execution_strategy_named, @@ -100,6 +100,9 @@ from spiffworkflow_backend.services.workflow_execution_service import ( from spiffworkflow_backend.services.workflow_execution_service import ( StepDetailLoggingDelegate, ) +from spiffworkflow_backend.services.workflow_execution_service import ( + TaskModelSavingDelegate, +) from spiffworkflow_backend.services.workflow_execution_service import ( WorkflowExecutionService, ) @@ -152,6 +155,10 @@ class SpiffStepDetailIsMissingError(Exception): pass +class TaskNotFoundError(Exception): + pass + + class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore def __init__(self, environment_globals: Dict[str, Any]): """BoxedTaskDataBasedScriptEngineEnvironment.""" @@ -802,7 +809,7 @@ class ProcessInstanceProcessor: if start_in_seconds is None: start_in_seconds = time.time() - task_json = self.get_task_json_from_spiff_task(spiff_task) + task_json = self.get_task_dict_from_spiff_task(spiff_task) return { "process_instance_id": self.process_instance_model.id, @@ -1034,91 +1041,12 @@ class ProcessInstanceProcessor: bpmn_process_definition_parent ) - def _add_bpmn_process( - self, - bpmn_process_dict: dict, - bpmn_process_parent: Optional[BpmnProcessModel] = None, - bpmn_process_guid: Optional[str] = None, - ) -> BpmnProcessModel: - tasks = bpmn_process_dict.pop("tasks") - bpmn_process_data = bpmn_process_dict.pop("data") - - bpmn_process = None - if bpmn_process_parent is not None: - bpmn_process = BpmnProcessModel.query.filter_by( - parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid - ).first() - elif self.process_instance_model.bpmn_process_id is not None: - bpmn_process = self.process_instance_model.bpmn_process - - if bpmn_process is None: - bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) - - bpmn_process.properties_json = bpmn_process_dict - - bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode( - "utf8" - ) - bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest() - if bpmn_process.json_data_hash != bpmn_process_data_hash: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=bpmn_process_data_hash) - .first() - ) - if json_data is None: - json_data = JsonDataModel( - hash=bpmn_process_data_hash, data=bpmn_process_data - ) - db.session.add(json_data) - bpmn_process.json_data_hash = bpmn_process_data_hash - - if bpmn_process_parent is None: - self.process_instance_model.bpmn_process = bpmn_process - elif bpmn_process.parent_process_id is None: - bpmn_process.parent_process_id = bpmn_process_parent.id - db.session.add(bpmn_process) - - for task_id, task_properties in tasks.items(): - task_data_dict = task_properties.pop("data") - state_int = task_properties["state"] - - task = TaskModel.query.filter_by(guid=task_id).first() - if task is None: - # bpmn_process_identifier = task_properties['workflow_name'] - # bpmn_identifier = task_properties['task_spec'] - # - # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) - # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() - # if task_definition is None: - # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) - task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) - task.state = TaskStateNames[state_int] - task.properties_json = task_properties - - task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8") - task_data_hash = sha256(task_data_json).hexdigest() - if task.json_data_hash != task_data_hash: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=task_data_hash) - .first() - ) - if json_data is None: - json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) - db.session.add(json_data) - task.json_data_hash = task_data_hash - db.session.add(task) - - return bpmn_process - def _add_bpmn_json_records(self) -> None: """Adds serialized_bpmn_definition and process_instance_data records to the db session. Expects the save method to commit it. """ bpmn_dict = json.loads(self.serialize()) - # with open('tmp2.json', 'w') as f: f.write(json.dumps(bpmn_dict) bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") process_instance_data_dict = {} bpmn_spec_dict = {} @@ -1132,14 +1060,14 @@ class ProcessInstanceProcessor: # if self.process_instance_model.bpmn_process_definition_id is None: self._add_bpmn_process_definitions(bpmn_spec_dict) - # FIXME: Update tasks in the did_complete_task instead to set the final info. - # We will need to somehow cache all tasks initially though before each task is run. - # Maybe always do this for first run - just need to know it's the first run. subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) + bpmn_process_parent = TaskService.add_bpmn_process( + process_instance_data_dict, self.process_instance_model + ) for subprocess_task_id, subprocess_properties in subprocesses.items(): - self._add_bpmn_process( + TaskService.add_bpmn_process( subprocess_properties, + self.process_instance_model, bpmn_process_parent, bpmn_process_guid=subprocess_task_id, ) @@ -1691,8 +1619,13 @@ class ProcessInstanceProcessor: step_delegate = StepDetailLoggingDelegate( self.increment_spiff_step, spiff_step_details_mapping_builder ) + task_model_delegate = TaskModelSavingDelegate( + secondary_engine_step_delegate=step_delegate, + serializer=self._serializer, + process_instance=self.process_instance_model, + ) execution_strategy = execution_strategy_named( - execution_strategy_name, step_delegate + execution_strategy_name, task_model_delegate ) execution_service = WorkflowExecutionService( self.bpmn_process_instance, @@ -1871,7 +1804,7 @@ class ProcessInstanceProcessor: ) return user_tasks # type: ignore - def get_task_json_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]: + def get_task_dict_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]: default_registry = DefaultRegistry() task_data = default_registry.convert(spiff_task.data) python_env = default_registry.convert( @@ -1884,17 +1817,29 @@ class ProcessInstanceProcessor: return task_json def complete_task( - self, task: SpiffTask, human_task: HumanTaskModel, user: UserModel + self, spiff_task: SpiffTask, human_task: HumanTaskModel, user: UserModel ) -> None: """Complete_task.""" - self.bpmn_process_instance.complete_task_from_id(task.id) + task_model = TaskModel.query.filter_by(guid=human_task.task_id).first() + if task_model is None: + raise TaskNotFoundError( + "Cannot find a task with guid" + f" {self.process_instance_model.id} and task_id is {human_task.task_id}" + ) + + task_model.start_in_seconds = time.time() + self.bpmn_process_instance.complete_task_from_id(spiff_task.id) + task_model.end_in_seconds = time.time() + human_task.completed_by_user_id = user.id human_task.completed = True db.session.add(human_task) + + # FIXME: remove when we switch over to using tasks only details_model = ( SpiffStepDetailsModel.query.filter_by( process_instance_id=self.process_instance_model.id, - task_id=str(task.id), + task_id=str(spiff_task.id), task_state="READY", ) .order_by(SpiffStepDetailsModel.id.desc()) # type: ignore @@ -1903,13 +1848,19 @@ class ProcessInstanceProcessor: if details_model is None: raise SpiffStepDetailIsMissingError( "Cannot find a ready spiff_step_detail entry for process instance" - f" {self.process_instance_model.id} and task_id is {task.id}" + f" {self.process_instance_model.id} and task_id is {spiff_task.id}" ) - details_model.task_state = task.get_state_name() + details_model.task_state = spiff_task.get_state_name() details_model.end_in_seconds = time.time() - details_model.task_json = self.get_task_json_from_spiff_task(task) + details_model.task_json = self.get_task_dict_from_spiff_task(spiff_task) db.session.add(details_model) + # ####### + + TaskService.update_task_model_and_add_to_db_session( + task_model, spiff_task, self._serializer + ) + # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py new file mode 100644 index 000000000..775e29099 --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -0,0 +1,200 @@ +import json +from hashlib import sha256 +from typing import Optional +from typing import Tuple + +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer +from SpiffWorkflow.task import Task as SpiffTask # type: ignore +from SpiffWorkflow.task import TaskStateNames + +from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel +from spiffworkflow_backend.models.db import db +from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401 +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 + + +class TaskService: + @classmethod + def update_task_data_on_task_model( + cls, task_model: TaskModel, task_data_dict: dict + ) -> None: + task_data_json = json.dumps(task_data_dict, sort_keys=True) + task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest() + if task_model.json_data_hash != task_data_hash: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=task_data_hash) + .first() + ) + if json_data is None: + json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) + db.session.add(json_data) + task_model.json_data_hash = task_data_hash + + @classmethod + def update_task_model_and_add_to_db_session( + cls, + task_model: TaskModel, + spiff_task: SpiffTask, + serializer: BpmnWorkflowSerializer, + ) -> None: + """Updates properties_json and data on given task_model. + + This will NOT update start_in_seconds or end_in_seconds. + """ + new_properties_json = serializer.task_to_dict(spiff_task) + spiff_task_data = new_properties_json.pop("data") + task_model.properties_json = new_properties_json + task_model.state = TaskStateNames[new_properties_json["state"]] + cls.update_task_data_on_task_model(task_model, spiff_task_data) + db.session.add(task_model) + + @classmethod + def find_or_create_task_model_from_spiff_task( + cls, + spiff_task: SpiffTask, + process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, + ) -> TaskModel: + spiff_task_guid = str(spiff_task.id) + task_model: Optional[TaskModel] = TaskModel.query.filter_by( + guid=spiff_task_guid + ).first() + if task_model is None: + bpmn_process = cls.task_bpmn_process( + spiff_task, process_instance, serializer + ) + task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() + if task_model is None: + task_model = TaskModel( + guid=spiff_task_guid, bpmn_process_id=bpmn_process.id + ) + return task_model + + @classmethod + def task_subprocess( + cls, spiff_task: SpiffTask + ) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: + top_level_workflow = spiff_task.workflow._get_outermost_workflow() + my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of + my_sp = None + my_sp_id = None + if my_wf != top_level_workflow: + # All the subprocesses are at the top level, so you can just compare them + for sp_id, sp in top_level_workflow.subprocesses.items(): + if sp == my_wf: + my_sp = sp + my_sp_id = sp_id + break + return (str(my_sp_id), my_sp) + + @classmethod + def task_bpmn_process( + cls, + spiff_task: SpiffTask, + process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, + ) -> BpmnProcessModel: + subprocess_guid, subprocess = cls.task_subprocess(spiff_task) + bpmn_process: Optional[BpmnProcessModel] = None + if subprocess is None: + bpmn_process = process_instance.bpmn_process + # This is the top level workflow, which has no guid + # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None + if process_instance.bpmn_process_id is None: + bpmn_process = cls.add_bpmn_process( + serializer.workflow_to_dict( + spiff_task.workflow._get_outermost_workflow() + ), + process_instance, + ) + db.session.commit() + else: + bpmn_process = BpmnProcessModel.query.filter_by( + guid=subprocess_guid + ).first() + if bpmn_process is None: + bpmn_process = cls.add_bpmn_process( + serializer.workflow_to_dict(subprocess), + process_instance, + process_instance.bpmn_process, + subprocess_guid, + ) + db.session.commit() + return bpmn_process + + @classmethod + def add_bpmn_process( + cls, + bpmn_process_dict: dict, + process_instance: ProcessInstanceModel, + bpmn_process_parent: Optional[BpmnProcessModel] = None, + bpmn_process_guid: Optional[str] = None, + ) -> BpmnProcessModel: + tasks = bpmn_process_dict.pop("tasks") + bpmn_process_data_dict = bpmn_process_dict.pop("data") + + bpmn_process = None + if bpmn_process_parent is not None: + bpmn_process = BpmnProcessModel.query.filter_by( + parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid + ).first() + elif process_instance.bpmn_process_id is not None: + bpmn_process = process_instance.bpmn_process + + bpmn_process_is_new = False + if bpmn_process is None: + bpmn_process_is_new = True + bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) + + bpmn_process.properties_json = bpmn_process_dict + + bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) + bpmn_process_data_hash = sha256( + bpmn_process_data_json.encode("utf8") + ).hexdigest() + if bpmn_process.json_data_hash != bpmn_process_data_hash: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=bpmn_process_data_hash) + .first() + ) + if json_data is None: + json_data = JsonDataModel( + hash=bpmn_process_data_hash, data=bpmn_process_data_dict + ) + db.session.add(json_data) + bpmn_process.json_data_hash = bpmn_process_data_hash + + if bpmn_process_parent is None: + process_instance.bpmn_process = bpmn_process + elif bpmn_process.parent_process_id is None: + bpmn_process.parent_process_id = bpmn_process_parent.id + db.session.add(bpmn_process) + + if bpmn_process_is_new: + for task_id, task_properties in tasks.items(): + task_data_dict = task_properties.pop("data") + state_int = task_properties["state"] + + task_model = TaskModel.query.filter_by(guid=task_id).first() + if task_model is None: + # bpmn_process_identifier = task_properties['workflow_name'] + # bpmn_identifier = task_properties['task_spec'] + # + # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) + # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() + # if task_definition is None: + # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) + task_model = TaskModel( + guid=task_id, bpmn_process_id=bpmn_process.id + ) + task_model.state = TaskStateNames[state_int] + task_model.properties_json = task_properties + + TaskService.update_task_data_on_task_model(task_model, task_data_dict) + db.session.add(task_model) + + return bpmn_process diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 576dee1ba..ef484fe66 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -2,7 +2,9 @@ import logging import time from typing import Callable from typing import List +from typing import Optional +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore @@ -16,18 +18,23 @@ from spiffworkflow_backend.models.message_instance_correlation import ( ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from spiffworkflow_backend.services.task_service import TaskService class EngineStepDelegate: """Interface of sorts for a concrete engine step delegate.""" - def will_complete_task(self, task: SpiffTask) -> None: + def will_complete_task(self, spiff_task: SpiffTask) -> None: pass - def did_complete_task(self, task: SpiffTask) -> None: + def did_complete_task(self, spiff_task: SpiffTask) -> None: pass - def save(self) -> None: + def save(self, commit: bool = False) -> None: + pass + + def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: pass @@ -35,6 +42,75 @@ SpiffStepIncrementer = Callable[[], None] SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict] +class TaskModelSavingDelegate(EngineStepDelegate): + """Engine step delegate that takes care of saving a task model to the database. + + It can also be given another EngineStepDelegate. + """ + + def __init__( + self, + serializer: BpmnWorkflowSerializer, + process_instance: ProcessInstanceModel, + secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, + ) -> None: + self.secondary_engine_step_delegate = secondary_engine_step_delegate + self.process_instance = process_instance + + self.current_task_model: Optional[TaskModel] = None + self.serializer = serializer + + def should_update_task_model(self) -> bool: + """We need to figure out if we have previously save task info on this process intance. + + Use the bpmn_process_id to do this. + """ + return self.process_instance.bpmn_process_id is not None + # return False + + def will_complete_task(self, spiff_task: SpiffTask) -> None: + if self.should_update_task_model(): + self.current_task_model = ( + TaskService.find_or_create_task_model_from_spiff_task( + spiff_task, self.process_instance, self.serializer + ) + ) + self.current_task_model.start_in_seconds = time.time() + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.will_complete_task(spiff_task) + + def did_complete_task(self, spiff_task: SpiffTask) -> None: + if self.current_task_model and self.should_update_task_model(): + self.current_task_model.end_in_seconds = time.time() + TaskService.update_task_model_and_add_to_db_session( + self.current_task_model, spiff_task, self.serializer + ) + db.session.add(self.current_task_model) + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.did_complete_task(spiff_task) + + def save(self, _commit: bool = True) -> None: + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.save(commit=False) + db.session.commit() + + def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: + for waiting_spiff_task in bpmn_process_instance.get_tasks( + TaskState.WAITING | TaskState.CANCELLED + ): + task_model = TaskModel.query.filter_by( + guid=str(waiting_spiff_task.id) + ).first() + if task_model is None: + task_model = TaskService.find_or_create_task_model_from_spiff_task( + waiting_spiff_task, self.process_instance, self.serializer + ) + TaskService.update_task_model_and_add_to_db_session( + task_model, waiting_spiff_task, self.serializer + ) + db.session.commit() + + class StepDetailLoggingDelegate(EngineStepDelegate): """Engine step delegate that takes care of logging spiff step details. @@ -65,28 +141,29 @@ class StepDetailLoggingDelegate(EngineStepDelegate): "Transactional Subprocess", } - def should_log(self, task: SpiffTask) -> bool: + def should_log(self, spiff_task: SpiffTask) -> bool: return ( - task.task_spec.spec_type in self.tasks_to_log - and not task.task_spec.name.endswith(".EndJoin") + spiff_task.task_spec.spec_type in self.tasks_to_log + and not spiff_task.task_spec.name.endswith(".EndJoin") ) - def will_complete_task(self, task: SpiffTask) -> None: - if self.should_log(task): + def will_complete_task(self, spiff_task: SpiffTask) -> None: + if self.should_log(spiff_task): self.current_task_start_in_seconds = time.time() self.increment_spiff_step() - def did_complete_task(self, task: SpiffTask) -> None: - if self.should_log(task): + def did_complete_task(self, spiff_task: SpiffTask) -> None: + if self.should_log(spiff_task): self.step_details.append( self.spiff_step_details_mapping( - task, self.current_task_start_in_seconds, time.time() + spiff_task, self.current_task_start_in_seconds, time.time() ) ) - def save(self) -> None: + def save(self, commit: bool = True) -> None: db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details) - db.session.commit() + if commit: + db.session.commit() class ExecutionStrategy: @@ -116,6 +193,7 @@ class GreedyExecutionStrategy(ExecutionStrategy): will_complete_task=self.delegate.will_complete_task, did_complete_task=self.delegate.did_complete_task, ) + self.delegate.after_engine_steps(bpmn_process_instance) class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): @@ -136,12 +214,12 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): ] ) while engine_steps: - for task in engine_steps: - if task.task_spec.spec_type == "Service Task": + for spiff_task in engine_steps: + if spiff_task.task_spec.spec_type == "Service Task": return - self.delegate.will_complete_task(task) - task.complete() - self.delegate.did_complete_task(task) + self.delegate.will_complete_task(spiff_task) + spiff_task.complete() + self.delegate.did_complete_task(spiff_task) engine_steps = list( [ @@ -151,6 +229,8 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): ] ) + self.delegate.after_engine_steps(bpmn_process_instance) + def execution_strategy_named( name: str, delegate: EngineStepDelegate @@ -224,6 +304,13 @@ class WorkflowExecutionService: correlation_keys=self.bpmn_process_instance.correlations, ) db.session.add(message_instance) + + bpmn_process = self.process_instance_model.bpmn_process + if bpmn_process is not None: + bpmn_process_correlations = self.bpmn_process_instance.correlations + bpmn_process.properties_json["correlations"] = bpmn_process_correlations + db.session.add(bpmn_process) + db.session.commit() def queue_waiting_receive_messages(self) -> None: @@ -261,6 +348,14 @@ class WorkflowExecutionService: ) message_instance.correlation_rules.append(message_correlation) db.session.add(message_instance) + + bpmn_process = self.process_instance_model.bpmn_process + + if bpmn_process is not None: + bpmn_process_correlations = self.bpmn_process_instance.correlations + bpmn_process.properties_json["correlations"] = bpmn_process_correlations + db.session.add(bpmn_process) + db.session.commit() diff --git a/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn b/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn index 4f0fba72c..f4d0190bc 100644 --- a/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn +++ b/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn @@ -2,39 +2,52 @@ - Flow_1xlck7g + Flow_0stlaxe - - + Flow_0nnh2x9 - + ## Hello - Flow_1xlck7g + Flow_1pmem7s Flow_0nnh2x9 + + + + Flow_0stlaxe + Flow_1pmem7s + the_new_var = "HEY" + - - + + + + + - + + + + + - + - + diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn new file mode 100644 index 000000000..d1b462f12 --- /dev/null +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/manual_task_with_subprocesses.bpmn @@ -0,0 +1,112 @@ + + + + + Flow_0stlaxe + + + Flow_1and8ze + + + + ## Hello + + Flow_1fktmf7 + Flow_09gjylo + + + + Flow_0stlaxe + Flow_1fktmf7 + set_in_top_level_script = 1 + + + + + + + Flow_09gjylo + Flow_1i7syph + + Flow_00k1tii + + + + Flow_1b4o55k + + + + Flow_00k1tii + Flow_1b4o55k + set_in_top_level_subprocess = 1 + + + + Flow_1i7syph + Flow_1and8ze + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn new file mode 100644 index 000000000..25b37c619 --- /dev/null +++ b/spiffworkflow-backend/tests/data/manual_task_with_subprocesses/test_process_to_call.bpmn @@ -0,0 +1,39 @@ + + + + + Flow_06g687y + + + + Flow_01e21r0 + + + + Flow_06g687y + Flow_01e21r0 + set_in_test_process_to_call_script = 1 + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 75eb31464..f124bcd96 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import ( @@ -292,6 +293,89 @@ class TestProcessInstanceProcessor(BaseTest): assert spiff_task is not None assert spiff_task.state == TaskState.COMPLETED + def test_properly_saves_tasks_when_running( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + """Test_does_not_recreate_human_tasks_on_multiple_saves.""" + self.create_process_group( + client, with_super_admin_user, "test_group", "test_group" + ) + initiator_user = self.find_or_create_user("initiator_user") + finance_user_three = self.find_or_create_user("testuser3") + assert initiator_user.principal is not None + assert finance_user_three.principal is not None + AuthorizationService.import_permissions_from_yaml_file() + + finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() + assert finance_group is not None + + process_model = load_test_spec( + process_model_id="test_group/manual_task_with_subprocesses", + process_model_source_directory="manual_task_with_subprocesses", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert len(process_instance.active_human_tasks) == 1 + initial_human_task_id = process_instance.active_human_tasks[0].id + + # save again to ensure we go attempt to process the human tasks again + processor.save() + + assert len(process_instance.active_human_tasks) == 1 + assert initial_human_task_id == process_instance.active_human_tasks[0].id + + processor = ProcessInstanceProcessor(process_instance) + human_task_one = process_instance.active_human_tasks[0] + spiff_manual_task = processor.__class__.get_task_by_bpmn_identifier( + human_task_one.task_name, processor.bpmn_process_instance + ) + ProcessInstanceService.complete_form_task( + processor, spiff_manual_task, {}, initiator_user, human_task_one + ) + + # recreate variables to ensure all bpmn json was recreated from scratch from the db + process_instance_relookup = ProcessInstanceModel.query.filter_by( + id=process_instance.id + ).first() + processor_final = ProcessInstanceProcessor(process_instance_relookup) + assert process_instance_relookup.status == "complete" + + first_data_set = {"set_in_top_level_script": 1} + second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}} + third_data_set = { + **second_data_set, + **{"set_in_test_process_to_call_script": 1}, + } + expected_task_data = { + "top_level_script": first_data_set, + "manual_task": first_data_set, + "top_level_subprocess_script": second_data_set, + "top_level_subprocess": second_data_set, + "test_process_to_call_script": third_data_set, + "top_level_call_activity": third_data_set, + "end_event_of_manual_task_model": third_data_set, + } + + all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() + assert len(all_spiff_tasks) > 1 + for spiff_task in all_spiff_tasks: + assert spiff_task.state == TaskState.COMPLETED + spiff_task_name = spiff_task.task_spec.name + if spiff_task_name in expected_task_data: + spiff_task_data = expected_task_data[spiff_task_name] + failure_message = ( + f"Found unexpected task data on {spiff_task_name}. " + f"Expected: {spiff_task_data}, Found: {spiff_task.data}" + ) + assert spiff_task.data == spiff_task_data, failure_message + def test_does_not_recreate_human_tasks_on_multiple_saves( self, app: Flask,