diff --git a/.flake8 b/.flake8 index fe5c9a94a..9c54dc0e7 100644 --- a/.flake8 +++ b/.flake8 @@ -17,10 +17,10 @@ per-file-ignores = # THEN, test_hey.py will NOT be excluding D103 # asserts are ok in tests - spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103 + spiffworkflow-backend/tests/*:S101,D100,D101,D102,D103,D107 # prefer naming functions descriptively rather than forcing comments - spiffworkflow-backend/src/*:D100,D101,D102,D103 + spiffworkflow-backend/src/*:D100,D101,D102,D103,D107 spiffworkflow-backend/bin/keycloak_test_server.py:B950,D spiffworkflow-backend/conftest.py:S105 diff --git a/spiffworkflow-backend/.flake8 b/spiffworkflow-backend/.flake8 index 0fceed158..2e6554e58 100644 --- a/spiffworkflow-backend/.flake8 +++ b/spiffworkflow-backend/.flake8 @@ -17,10 +17,10 @@ per-file-ignores = # THEN, test_hey.py will NOT be excluding D103 # asserts are ok in tests - tests/*:S101,D100,D101,D102,D103 + tests/*:S101,D100,D101,D102,D103,D107 # prefer naming functions descriptively rather than forcing comments - src/*:D100,D101,D102,D103 + src/*:D100,D101,D102,D103,D107 bin/keycloak_test_server.py:B950,D conftest.py:S105 diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 30f8a481c..eeee7dc68 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -93,6 +93,7 @@ from spiffworkflow_backend.services.file_system_service import FileSystemService from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.service_task_service import ServiceTaskDelegate from spiffworkflow_backend.services.spec_file_service import SpecFileService +from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.workflow_execution_service import ( execution_strategy_named, @@ -100,6 +101,9 @@ from spiffworkflow_backend.services.workflow_execution_service import ( from spiffworkflow_backend.services.workflow_execution_service import ( StepDetailLoggingDelegate, ) +from spiffworkflow_backend.services.workflow_execution_service import ( + TaskModelSavingDelegate, +) from spiffworkflow_backend.services.workflow_execution_service import ( WorkflowExecutionService, ) @@ -152,6 +156,10 @@ class SpiffStepDetailIsMissingError(Exception): pass +class TaskNotFoundError(Exception): + pass + + class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # type: ignore def __init__(self, environment_globals: Dict[str, Any]): """BoxedTaskDataBasedScriptEngineEnvironment.""" @@ -802,7 +810,7 @@ class ProcessInstanceProcessor: if start_in_seconds is None: start_in_seconds = time.time() - task_json = self.get_task_json_from_spiff_task(spiff_task) + task_json = self.get_task_dict_from_spiff_task(spiff_task) return { "process_instance_id": self.process_instance_model.id, @@ -1083,8 +1091,8 @@ class ProcessInstanceProcessor: task_data_dict = task_properties.pop("data") state_int = task_properties["state"] - task = TaskModel.query.filter_by(guid=task_id).first() - if task is None: + task_model = TaskModel.query.filter_by(guid=task_id).first() + if task_model is None: # bpmn_process_identifier = task_properties['workflow_name'] # bpmn_identifier = task_properties['task_spec'] # @@ -1092,23 +1100,12 @@ class ProcessInstanceProcessor: # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() # if task_definition is None: # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) - task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) - task.state = TaskStateNames[state_int] - task.properties_json = task_properties + task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) + task_model.state = TaskStateNames[state_int] + task_model.properties_json = task_properties - task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8") - task_data_hash = sha256(task_data_json).hexdigest() - if task.json_data_hash != task_data_hash: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=task_data_hash) - .first() - ) - if json_data is None: - json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) - db.session.add(json_data) - task.json_data_hash = task_data_hash - db.session.add(task) + TaskService.update_task_data_on_task_model(task_model, task_data_dict) + db.session.add(task_model) return bpmn_process @@ -1135,14 +1132,15 @@ class ProcessInstanceProcessor: # FIXME: Update tasks in the did_complete_task instead to set the final info. # We will need to somehow cache all tasks initially though before each task is run. # Maybe always do this for first run - just need to know it's the first run. - subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) - for subprocess_task_id, subprocess_properties in subprocesses.items(): - self._add_bpmn_process( - subprocess_properties, - bpmn_process_parent, - bpmn_process_guid=subprocess_task_id, - ) + if self.process_instance_model.bpmn_process_id is None: + subprocesses = process_instance_data_dict.pop("subprocesses") + bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) + for subprocess_task_id, subprocess_properties in subprocesses.items(): + self._add_bpmn_process( + subprocess_properties, + bpmn_process_parent, + bpmn_process_guid=subprocess_task_id, + ) def save(self) -> None: """Saves the current state of this processor to the database.""" @@ -1691,8 +1689,13 @@ class ProcessInstanceProcessor: step_delegate = StepDetailLoggingDelegate( self.increment_spiff_step, spiff_step_details_mapping_builder ) + task_model_delegate = TaskModelSavingDelegate( + secondary_engine_step_delegate=step_delegate, + serializer=self._serializer, + process_instance=self.process_instance_model, + ) execution_strategy = execution_strategy_named( - execution_strategy_name, step_delegate + execution_strategy_name, task_model_delegate ) execution_service = WorkflowExecutionService( self.bpmn_process_instance, @@ -1871,7 +1874,7 @@ class ProcessInstanceProcessor: ) return user_tasks # type: ignore - def get_task_json_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]: + def get_task_dict_from_spiff_task(self, spiff_task: SpiffTask) -> dict[str, Any]: default_registry = DefaultRegistry() task_data = default_registry.convert(spiff_task.data) python_env = default_registry.convert( @@ -1884,17 +1887,29 @@ class ProcessInstanceProcessor: return task_json def complete_task( - self, task: SpiffTask, human_task: HumanTaskModel, user: UserModel + self, spiff_task: SpiffTask, human_task: HumanTaskModel, user: UserModel ) -> None: """Complete_task.""" - self.bpmn_process_instance.complete_task_from_id(task.id) + task_model = TaskModel.query.filter_by(guid=human_task.task_id).first() + if task_model is None: + raise TaskNotFoundError( + "Cannot find a task with guid" + f" {self.process_instance_model.id} and task_id is {human_task.task_id}" + ) + + task_model.start_in_seconds = time.time() + self.bpmn_process_instance.complete_task_from_id(spiff_task.id) + task_model.end_in_seconds = time.time() + human_task.completed_by_user_id = user.id human_task.completed = True db.session.add(human_task) + + # FIXME: remove when we switch over to using tasks only details_model = ( SpiffStepDetailsModel.query.filter_by( process_instance_id=self.process_instance_model.id, - task_id=str(task.id), + task_id=str(spiff_task.id), task_state="READY", ) .order_by(SpiffStepDetailsModel.id.desc()) # type: ignore @@ -1903,13 +1918,19 @@ class ProcessInstanceProcessor: if details_model is None: raise SpiffStepDetailIsMissingError( "Cannot find a ready spiff_step_detail entry for process instance" - f" {self.process_instance_model.id} and task_id is {task.id}" + f" {self.process_instance_model.id} and task_id is {spiff_task.id}" ) - details_model.task_state = task.get_state_name() + details_model.task_state = spiff_task.get_state_name() details_model.end_in_seconds = time.time() - details_model.task_json = self.get_task_json_from_spiff_task(task) + details_model.task_json = self.get_task_dict_from_spiff_task(spiff_task) db.session.add(details_model) + # ####### + + TaskService.update_task_model_and_add_to_db_session( + task_model, spiff_task, self._serializer + ) + # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py new file mode 100644 index 000000000..9c6db0fdf --- /dev/null +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -0,0 +1,49 @@ +from spiffworkflow_backend.models.process_instance import ProcessInstanceModel +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore +from SpiffWorkflow.task import TaskStateNames # type: ignore +from SpiffWorkflow.task import Task as SpiffTask +from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401 +from spiffworkflow_backend.models.db import db +from hashlib import sha256 +import json + + +class TaskService(): + + @classmethod + def update_task_data_on_task_model(cls, task_model: TaskModel, task_data_dict: dict) -> None: + task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8") + task_data_hash = sha256(task_data_json).hexdigest() + if task_model.json_data_hash != task_data_hash: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=task_data_hash) + .first() + ) + if json_data is None: + json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) + db.session.add(json_data) + task_model.json_data_hash = task_data_hash + + @classmethod + def update_task_model_and_add_to_db_session(cls, task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer) -> None: + """Updates properties_json and data on given task_model. + + This will NOT update start_in_seconds or end_in_seconds. + """ + new_properties_json = serializer.task_to_dict(spiff_task) + task_model.properties_json = new_properties_json + task_model.state = TaskStateNames[new_properties_json['state']] + cls.update_task_data_on_task_model(task_model, spiff_task.data) + db.session.add(task_model) + + @classmethod + def find_or_create_task_model_from_spiff_task(cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel) -> TaskModel: + spiff_task_guid = str(spiff_task.id) + task_model: TaskModel = TaskModel.query.filter_by(guid=spiff_task_guid).first() + # if task_model is None: + # task_model = TaskModel(guid=spiff_task_guid, bpmn_process_id=process_instance.bpmn_process_id) + # db.session.add(task_model) + # db.session.commit() + return task_model diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 576dee1ba..89ca9caab 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -2,7 +2,9 @@ import logging import time from typing import Callable from typing import List +from typing import Optional +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.exceptions import SpiffWorkflowException # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore @@ -16,18 +18,20 @@ from spiffworkflow_backend.models.message_instance_correlation import ( ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 +from spiffworkflow_backend.services.task_service import TaskService class EngineStepDelegate: """Interface of sorts for a concrete engine step delegate.""" - def will_complete_task(self, task: SpiffTask) -> None: + def will_complete_task(self, spiff_task: SpiffTask) -> None: pass - def did_complete_task(self, task: SpiffTask) -> None: + def did_complete_task(self, spiff_task: SpiffTask) -> None: pass - def save(self) -> None: + def save(self, commit: bool = False) -> None: pass @@ -35,6 +39,54 @@ SpiffStepIncrementer = Callable[[], None] SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict] +class TaskModelSavingDelegate(EngineStepDelegate): + """Engine step delegate that takes care of saving a task model to the database. + + It can also be given another EngineStepDelegate. + """ + + def __init__( + self, + serializer: BpmnWorkflowSerializer, + process_instance: ProcessInstanceModel, + secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, + ) -> None: + self.secondary_engine_step_delegate = secondary_engine_step_delegate + self.process_instance = process_instance + + self.current_task_model: Optional[TaskModel] = None + self.serializer = serializer + + def should_update_task_model(self) -> bool: + return self.process_instance.bpmn_process_id is not None + + def will_complete_task(self, spiff_task: SpiffTask) -> None: + if self.should_update_task_model(): + self.current_task_model = ( + TaskService.find_or_create_task_model_from_spiff_task( + spiff_task, self.process_instance + ) + ) + self.current_task_model.start_in_seconds = time.time() + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.will_complete_task(spiff_task) + + def did_complete_task(self, spiff_task: SpiffTask) -> None: + if self.current_task_model and self.should_update_task_model(): + self.current_task_model.end_in_seconds = time.time() + TaskService.update_task_model_and_add_to_db_session( + self.current_task_model, spiff_task, self.serializer + ) + db.session.add(self.current_task_model) + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.did_complete_task(spiff_task) + + def save(self, _commit: bool = True) -> None: + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.save(commit=False) + db.session.commit() + + class StepDetailLoggingDelegate(EngineStepDelegate): """Engine step delegate that takes care of logging spiff step details. @@ -65,28 +117,29 @@ class StepDetailLoggingDelegate(EngineStepDelegate): "Transactional Subprocess", } - def should_log(self, task: SpiffTask) -> bool: + def should_log(self, spiff_task: SpiffTask) -> bool: return ( - task.task_spec.spec_type in self.tasks_to_log - and not task.task_spec.name.endswith(".EndJoin") + spiff_task.task_spec.spec_type in self.tasks_to_log + and not spiff_task.task_spec.name.endswith(".EndJoin") ) - def will_complete_task(self, task: SpiffTask) -> None: - if self.should_log(task): + def will_complete_task(self, spiff_task: SpiffTask) -> None: + if self.should_log(spiff_task): self.current_task_start_in_seconds = time.time() self.increment_spiff_step() - def did_complete_task(self, task: SpiffTask) -> None: - if self.should_log(task): + def did_complete_task(self, spiff_task: SpiffTask) -> None: + if self.should_log(spiff_task): self.step_details.append( self.spiff_step_details_mapping( - task, self.current_task_start_in_seconds, time.time() + spiff_task, self.current_task_start_in_seconds, time.time() ) ) - def save(self) -> None: + def save(self, commit: bool = True) -> None: db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details) - db.session.commit() + if commit: + db.session.commit() class ExecutionStrategy: @@ -136,12 +189,12 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): ] ) while engine_steps: - for task in engine_steps: - if task.task_spec.spec_type == "Service Task": + for spiff_task in engine_steps: + if spiff_task.task_spec.spec_type == "Service Task": return - self.delegate.will_complete_task(task) - task.complete() - self.delegate.did_complete_task(task) + self.delegate.will_complete_task(spiff_task) + spiff_task.complete() + self.delegate.did_complete_task(spiff_task) engine_steps = list( [ diff --git a/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn b/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn index 4f0fba72c..f4d0190bc 100644 --- a/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn +++ b/spiffworkflow-backend/tests/data/manual_task/manual_task.bpmn @@ -2,39 +2,52 @@ - Flow_1xlck7g + Flow_0stlaxe - - + Flow_0nnh2x9 - + ## Hello - Flow_1xlck7g + Flow_1pmem7s Flow_0nnh2x9 + + + + Flow_0stlaxe + Flow_1pmem7s + the_new_var = "HEY" + - - + + + + + - + + + + + - + - + diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 75eb31464..7fa0f13b8 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.authorization_service import AuthorizationService from spiffworkflow_backend.services.authorization_service import ( @@ -292,6 +293,68 @@ class TestProcessInstanceProcessor(BaseTest): assert spiff_task is not None assert spiff_task.state == TaskState.COMPLETED + def test_properly_saves_tasks_when_running( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + """Test_does_not_recreate_human_tasks_on_multiple_saves.""" + self.create_process_group( + client, with_super_admin_user, "test_group", "test_group" + ) + initiator_user = self.find_or_create_user("initiator_user") + finance_user_three = self.find_or_create_user("testuser3") + assert initiator_user.principal is not None + assert finance_user_three.principal is not None + AuthorizationService.import_permissions_from_yaml_file() + + finance_group = GroupModel.query.filter_by(identifier="Finance Team").first() + assert finance_group is not None + + process_model = load_test_spec( + process_model_id="test_group/manual_task", + bpmn_file_name="manual_task.bpmn", + process_model_source_directory="manual_task", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=initiator_user + ) + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + assert len(process_instance.active_human_tasks) == 1 + initial_human_task_id = process_instance.active_human_tasks[0].id + + # save again to ensure we go attempt to process the human tasks again + processor.save() + + assert len(process_instance.active_human_tasks) == 1 + assert initial_human_task_id == process_instance.active_human_tasks[0].id + + processor = ProcessInstanceProcessor(process_instance) + human_task_one = process_instance.active_human_tasks[0] + spiff_task = processor.__class__.get_task_by_bpmn_identifier( + human_task_one.task_name, processor.bpmn_process_instance + ) + ProcessInstanceService.complete_form_task( + processor, spiff_task, {}, initiator_user, human_task_one + ) + + process_instance_relookup = ProcessInstanceModel.query.filter_by( + id=process_instance.id + ).first() + processor = ProcessInstanceProcessor(process_instance_relookup) + assert process_instance_relookup.status == "complete" + task = TaskModel.query.filter_by(guid=human_task_one.task_id).first() + assert task.state == "COMPLETED" + end_event_spiff_task = processor.__class__.get_task_by_bpmn_identifier( + "end_event_of_manual_task_model", processor.bpmn_process_instance + ) + assert end_event_spiff_task + assert end_event_spiff_task.state == TaskState.COMPLETED + # # NOTE: also check the spiff task from the new processor + def test_does_not_recreate_human_tasks_on_multiple_saves( self, app: Flask,