From 7806e6c460de20710fb5b71894306d561fda3602 Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 10 Mar 2023 16:52:57 -0500 Subject: [PATCH] tests are now passing w/ burnettk --- .../services/process_instance_processor.py | 6 +- .../services/task_service.py | 83 +++++++++++-------- .../services/workflow_execution_service.py | 38 ++++++++- .../unit/test_process_instance_processor.py | 9 +- 4 files changed, 97 insertions(+), 39 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 3b9c43aca..0d52adb05 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -50,7 +50,6 @@ from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskState -from SpiffWorkflow.task import TaskStateNames from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from sqlalchemy import text @@ -612,6 +611,7 @@ class ProcessInstanceProcessor: ).first() bpmn_process_dict = {"data": json_data.data, "tasks": {}} bpmn_process_dict.update(bpmn_process.properties_json) + print(f"bpmn_process_dict: {bpmn_process_dict}") tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all() for task in tasks: json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first() @@ -1062,7 +1062,9 @@ class ProcessInstanceProcessor: self._add_bpmn_process_definitions(bpmn_spec_dict) subprocesses = process_instance_data_dict.pop("subprocesses") - bpmn_process_parent = TaskService.add_bpmn_process(process_instance_data_dict, self.process_instance_model) + bpmn_process_parent = TaskService.add_bpmn_process( + process_instance_data_dict, self.process_instance_model + ) for subprocess_task_id, subprocess_properties in subprocesses.items(): TaskService.add_bpmn_process( subprocess_properties, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 763dd1b68..775e29099 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -1,10 +1,10 @@ import json from hashlib import sha256 -from typing import Tuple -from typing import Any from typing import Optional +from typing import Tuple -from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer, BpmnWorkflow # type: ignore +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore +from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskStateNames @@ -20,8 +20,8 @@ class TaskService: def update_task_data_on_task_model( cls, task_model: TaskModel, task_data_dict: dict ) -> None: - task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8") - task_data_hash = sha256(task_data_json).hexdigest() + task_data_json = json.dumps(task_data_dict, sort_keys=True) + task_data_hash = sha256(task_data_json.encode("utf8")).hexdigest() if task_model.json_data_hash != task_data_hash: json_data = ( db.session.query(JsonDataModel.id) @@ -45,34 +45,38 @@ class TaskService: This will NOT update start_in_seconds or end_in_seconds. """ new_properties_json = serializer.task_to_dict(spiff_task) + spiff_task_data = new_properties_json.pop("data") task_model.properties_json = new_properties_json task_model.state = TaskStateNames[new_properties_json["state"]] - cls.update_task_data_on_task_model(task_model, spiff_task.data) + cls.update_task_data_on_task_model(task_model, spiff_task_data) db.session.add(task_model) @classmethod def find_or_create_task_model_from_spiff_task( - cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, - serializer: BpmnWorkflowSerializer + cls, + spiff_task: SpiffTask, + process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, ) -> TaskModel: spiff_task_guid = str(spiff_task.id) task_model: Optional[TaskModel] = TaskModel.query.filter_by( guid=spiff_task_guid ).first() if task_model is None: - bpmn_process = cls.task_bpmn_process(spiff_task, process_instance, serializer) - task_model = TaskModel.query.filter_by( - guid=spiff_task_guid - ).first() + bpmn_process = cls.task_bpmn_process( + spiff_task, process_instance, serializer + ) + task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() if task_model is None: task_model = TaskModel( guid=spiff_task_guid, bpmn_process_id=bpmn_process.id ) - db.session.commit() return task_model @classmethod - def task_subprocess(cls, spiff_task: SpiffTask) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: + def task_subprocess( + cls, spiff_task: SpiffTask + ) -> Tuple[Optional[str], Optional[BpmnWorkflow]]: top_level_workflow = spiff_task.workflow._get_outermost_workflow() my_wf = spiff_task.workflow # This is the workflow the spiff_task is part of my_sp = None @@ -88,27 +92,38 @@ class TaskService: @classmethod def task_bpmn_process( - cls, spiff_task: SpiffTask, process_instance: ProcessInstanceModel, - serializer: BpmnWorkflowSerializer + cls, + spiff_task: SpiffTask, + process_instance: ProcessInstanceModel, + serializer: BpmnWorkflowSerializer, ) -> BpmnProcessModel: subprocess_guid, subprocess = cls.task_subprocess(spiff_task) + bpmn_process: Optional[BpmnProcessModel] = None if subprocess is None: + bpmn_process = process_instance.bpmn_process # This is the top level workflow, which has no guid - return process_instance.bpmn_process + # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None + if process_instance.bpmn_process_id is None: + bpmn_process = cls.add_bpmn_process( + serializer.workflow_to_dict( + spiff_task.workflow._get_outermost_workflow() + ), + process_instance, + ) + db.session.commit() else: - # import pdb; pdb.set_trace() - bpmn_process: Optional[BpmnProcessModel] = BpmnProcessModel.query.filter_by( + bpmn_process = BpmnProcessModel.query.filter_by( guid=subprocess_guid ).first() - # import pdb; pdb.set_trace() if bpmn_process is None: - bpmn_process = cls.add_bpmn_process(serializer.workflow_to_dict(subprocess), process_instance, process_instance.bpmn_process, subprocess_guid) + bpmn_process = cls.add_bpmn_process( + serializer.workflow_to_dict(subprocess), + process_instance, + process_instance.bpmn_process, + subprocess_guid, + ) db.session.commit() - # spiff_task_guid = str(spiff_task.id) - # raise Exception( - # f"Could not find bpmn_process for task {spiff_task_guid}" - # ) - return bpmn_process + return bpmn_process @classmethod def add_bpmn_process( @@ -119,7 +134,7 @@ class TaskService: bpmn_process_guid: Optional[str] = None, ) -> BpmnProcessModel: tasks = bpmn_process_dict.pop("tasks") - bpmn_process_data = bpmn_process_dict.pop("data") + bpmn_process_data_dict = bpmn_process_dict.pop("data") bpmn_process = None if bpmn_process_parent is not None: @@ -136,10 +151,10 @@ class TaskService: bpmn_process.properties_json = bpmn_process_dict - bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode( - "utf8" - ) - bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest() + bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) + bpmn_process_data_hash = sha256( + bpmn_process_data_json.encode("utf8") + ).hexdigest() if bpmn_process.json_data_hash != bpmn_process_data_hash: json_data = ( db.session.query(JsonDataModel.id) @@ -148,7 +163,7 @@ class TaskService: ) if json_data is None: json_data = JsonDataModel( - hash=bpmn_process_data_hash, data=bpmn_process_data + hash=bpmn_process_data_hash, data=bpmn_process_data_dict ) db.session.add(json_data) bpmn_process.json_data_hash = bpmn_process_data_hash @@ -173,7 +188,9 @@ class TaskService: # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() # if task_definition is None: # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) - task_model = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) + task_model = TaskModel( + guid=task_id, bpmn_process_id=bpmn_process.id + ) task_model.state = TaskStateNames[state_int] task_model.properties_json = task_properties diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 37b289c0a..ef484fe66 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,6 +1,5 @@ import logging import time -from typing import Any from typing import Callable from typing import List from typing import Optional @@ -35,6 +34,9 @@ class EngineStepDelegate: def save(self, commit: bool = False) -> None: pass + def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: + pass + SpiffStepIncrementer = Callable[[], None] SpiffStepDetailsMappingBuilder = Callable[[SpiffTask, float, float], dict] @@ -92,6 +94,22 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.secondary_engine_step_delegate.save(commit=False) db.session.commit() + def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: + for waiting_spiff_task in bpmn_process_instance.get_tasks( + TaskState.WAITING | TaskState.CANCELLED + ): + task_model = TaskModel.query.filter_by( + guid=str(waiting_spiff_task.id) + ).first() + if task_model is None: + task_model = TaskService.find_or_create_task_model_from_spiff_task( + waiting_spiff_task, self.process_instance, self.serializer + ) + TaskService.update_task_model_and_add_to_db_session( + task_model, waiting_spiff_task, self.serializer + ) + db.session.commit() + class StepDetailLoggingDelegate(EngineStepDelegate): """Engine step delegate that takes care of logging spiff step details. @@ -175,6 +193,7 @@ class GreedyExecutionStrategy(ExecutionStrategy): will_complete_task=self.delegate.will_complete_task, did_complete_task=self.delegate.did_complete_task, ) + self.delegate.after_engine_steps(bpmn_process_instance) class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): @@ -210,6 +229,8 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): ] ) + self.delegate.after_engine_steps(bpmn_process_instance) + def execution_strategy_named( name: str, delegate: EngineStepDelegate @@ -283,6 +304,13 @@ class WorkflowExecutionService: correlation_keys=self.bpmn_process_instance.correlations, ) db.session.add(message_instance) + + bpmn_process = self.process_instance_model.bpmn_process + if bpmn_process is not None: + bpmn_process_correlations = self.bpmn_process_instance.correlations + bpmn_process.properties_json["correlations"] = bpmn_process_correlations + db.session.add(bpmn_process) + db.session.commit() def queue_waiting_receive_messages(self) -> None: @@ -320,6 +348,14 @@ class WorkflowExecutionService: ) message_instance.correlation_rules.append(message_correlation) db.session.add(message_instance) + + bpmn_process = self.process_instance_model.bpmn_process + + if bpmn_process is not None: + bpmn_process_correlations = self.bpmn_process_instance.correlations + bpmn_process.properties_json["correlations"] = bpmn_process_correlations + db.session.add(bpmn_process) + db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 5e586fa08..f124bcd96 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -347,9 +347,12 @@ class TestProcessInstanceProcessor(BaseTest): processor_final = ProcessInstanceProcessor(process_instance_relookup) assert process_instance_relookup.status == "complete" - first_data_set = {'set_in_top_level_script': 1} - second_data_set = {**first_data_set, **{'set_in_top_level_subprocess': 1}} - third_data_set = {**second_data_set, **{'set_in_test_process_to_call_script': 1}} + first_data_set = {"set_in_top_level_script": 1} + second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}} + third_data_set = { + **second_data_set, + **{"set_in_test_process_to_call_script": 1}, + } expected_task_data = { "top_level_script": first_data_set, "manual_task": first_data_set,