From 5c0d72ab919ccdd0ef64f839a5a4d8b2ffae9e74 Mon Sep 17 00:00:00 2001 From: jasquat Date: Thu, 16 Mar 2023 16:14:41 -0400 Subject: [PATCH] tests are now passing w/ burnettk --- .../src/spiffworkflow_backend/models/task.py | 1 + .../services/process_instance_processor.py | 16 ++- .../services/task_service.py | 23 +-- .../services/workflow_execution_service.py | 133 ++++++++---------- .../integration/test_process_api.py | 4 +- .../unit/test_process_instance_processor.py | 36 ++++- 6 files changed, 126 insertions(+), 87 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py index 5bee3b395..959477236 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py @@ -50,6 +50,7 @@ class TaskModel(SpiffworkflowBaseDBModel): id: int = db.Column(db.Integer, primary_key=True) guid: str = db.Column(db.String(36), nullable=False, unique=True, index=True) bpmn_process_id: int = db.Column(ForeignKey(BpmnProcessModel.id), nullable=False) # type: ignore + bpmn_process = relationship(BpmnProcessModel) process_instance_id: int = db.Column(ForeignKey("process_instance.id"), nullable=False) # find this by looking up the "workflow_name" and "task_spec" from the properties_json diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 3f00452de..c4bc4856c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -326,6 +326,10 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore environment = CustomScriptEngineEnvironment(default_globals) + # right now spiff is executing script tasks on ready so doing this + # so we know when something fails and we can save it to our database. + self.failing_spiff_task: Optional[SpiffTask] = None + super().__init__(environment=environment) def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]: @@ -385,11 +389,14 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None: """Execute.""" try: + # reset failing task just in case + self.failing_spiff_task = None methods = self.__get_augment_methods(task) if external_methods: methods.update(external_methods) super().execute(task, script, methods) except WorkflowException as e: + self.failing_spiff_task = task raise e except Exception as e: raise self.create_task_exec_exception(task, script, e) from e @@ -1558,7 +1565,6 @@ class ProcessInstanceProcessor: serializer=self._serializer, process_instance=self.process_instance_model, bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, - # script_engine=self._script_engine, ) if execution_strategy_name is None: @@ -1572,7 +1578,13 @@ class ProcessInstanceProcessor: self._script_engine.environment.finalize_result, self.save, ) - execution_service.do_engine_steps(exit_at, save) + try: + execution_service.do_engine_steps(exit_at, save) + finally: + # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the + # script engine on a class variable. + if hasattr(self._script_engine, "failing_spiff_task") and self._script_engine.failing_spiff_task is not None: + self._script_engine.failing_spiff_task = None # log the spiff step details so we know what is processing the process # instance when a human task has a timer event. diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 6368eb82b..a83a60c37 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -202,14 +202,9 @@ class TaskService: bpmn_process.properties_json = bpmn_process_dict - bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) - bpmn_process_data_hash = sha256(bpmn_process_data_json.encode("utf8")).hexdigest() - if bpmn_process.json_data_hash != bpmn_process_data_hash: - new_json_data_dicts[bpmn_process_data_hash] = { - "hash": bpmn_process_data_hash, - "data": bpmn_process_data_dict, - } - bpmn_process.json_data_hash = bpmn_process_data_hash + bpmn_process_json_data = cls.update_task_data_on_bpmn_process(bpmn_process, bpmn_process_data_dict) + if bpmn_process_json_data is not None: + new_json_data_dicts[bpmn_process_json_data['hash']] = bpmn_process_json_data if bpmn_process_parent is None: process_instance.bpmn_process = bpmn_process @@ -261,6 +256,18 @@ class TaskService: return (bpmn_process, new_task_models, new_json_data_dicts) + @classmethod + def update_task_data_on_bpmn_process( + cls, bpmn_process: BpmnProcessModel, bpmn_process_data_dict: dict + ) -> Optional[JsonDataDict]: + bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) + bpmn_process_data_hash: str = sha256(bpmn_process_data_json.encode("utf8")).hexdigest() + json_data_dict: Optional[JsonDataDict] = None + if bpmn_process.json_data_hash != bpmn_process_data_hash: + json_data_dict = {"hash": bpmn_process_data_hash, "data": bpmn_process_data_dict} + bpmn_process.json_data_hash = bpmn_process_data_hash + return json_data_dict + @classmethod def _update_task_data_on_task_model( cls, task_model: TaskModel, task_data_dict: dict, task_model_data_column: str diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 19710ea8a..3f7359016 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -36,7 +36,7 @@ class EngineStepDelegate: def did_complete_task(self, spiff_task: SpiffTask) -> None: pass - def save(self, commit: bool = False) -> None: + def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None: pass def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: @@ -58,13 +58,11 @@ class TaskModelSavingDelegate(EngineStepDelegate): serializer: BpmnWorkflowSerializer, process_instance: ProcessInstanceModel, bpmn_definition_to_task_definitions_mappings: dict, - # script_engine, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance self.bpmn_definition_to_task_definitions_mappings = bpmn_definition_to_task_definitions_mappings - # self.script_engine = script_engine self.current_task_model: Optional[TaskModel] = None self.current_task_start_in_seconds: Optional[float] = None @@ -72,7 +70,41 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.json_data_dicts: dict[str, JsonDataDict] = {} self.serializer = serializer - def should_update_task_model(self) -> bool: + def will_complete_task(self, spiff_task: SpiffTask) -> None: + if self._should_update_task_model(): + self.current_task_start_in_seconds = time.time() + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.will_complete_task(spiff_task) + + def did_complete_task(self, spiff_task: SpiffTask) -> None: + if self._should_update_task_model(): + self._update_task_model_with_spiff_task(spiff_task) + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.did_complete_task(spiff_task) + + def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None: + script_engine = bpmn_process_instance.script_engine + if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None: + failing_spiff_task = script_engine.failing_spiff_task + self._update_task_model_with_spiff_task(failing_spiff_task) + + db.session.bulk_save_objects(self.task_models.values()) + + TaskService.insert_or_update_json_data_records(self.json_data_dicts) + + if self.secondary_engine_step_delegate: + self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False) + db.session.commit() + + def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: + if self._should_update_task_model(): + # excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion. + for waiting_spiff_task in bpmn_process_instance.get_tasks( + TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY + ): + self._update_task_model_with_spiff_task(waiting_spiff_task) + + def _should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. Use the bpmn_process_id to do this. @@ -85,76 +117,23 @@ class TaskModelSavingDelegate(EngineStepDelegate): if json_data_dict is not None: self.json_data_dicts[json_data_dict["hash"]] = json_data_dict - def will_complete_task(self, spiff_task: SpiffTask) -> None: - if self.should_update_task_model(): - # _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( - # TaskService.find_or_create_task_model_from_spiff_task( - # spiff_task, - # self.process_instance, - # self.serializer, - # bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, - # ) - # ) - # self.current_task_model = task_model - # self.task_models.update(new_task_models) - # self.json_data_dicts.update(new_json_data_dicts) - # self.current_task_model.start_in_seconds = time.time() - self.current_task_start_in_seconds = time.time() - if self.secondary_engine_step_delegate: - self.secondary_engine_step_delegate.will_complete_task(spiff_task) - - def did_complete_task(self, spiff_task: SpiffTask) -> None: - # if self.current_task_model and self.should_update_task_model(): - if self.should_update_task_model(): - # if spiff_task.task_spec.name == 'top_level_script': - # import pdb; pdb.set_trace() - # spiff_task.workflow.script_engine.environment.revise_state_with_task_data(spiff_task) - _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( - TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, - self.process_instance, - self.serializer, - bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, - ) + def _update_task_model_with_spiff_task(self, spiff_task: SpiffTask) -> None: + bpmn_process, task_model, new_task_models, new_json_data_dicts = ( + TaskService.find_or_create_task_model_from_spiff_task( + spiff_task, + self.process_instance, + self.serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) - task_model.start_in_seconds = self.current_task_start_in_seconds or time.time() - task_model.end_in_seconds = time.time() - json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer) - self._update_json_data_dicts_using_list(json_data_dict_list) - self.task_models.update(new_task_models) - self.json_data_dicts.update(new_json_data_dicts) - self.task_models[task_model.guid] = task_model - if self.secondary_engine_step_delegate: - self.secondary_engine_step_delegate.did_complete_task(spiff_task) - - def save(self, _commit: bool = True) -> None: - db.session.bulk_save_objects(self.task_models.values()) - - TaskService.insert_or_update_json_data_records(self.json_data_dicts) - - if self.secondary_engine_step_delegate: - self.secondary_engine_step_delegate.save(commit=False) - db.session.commit() - - def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: - if self.should_update_task_model(): - # excludes FUTURE and COMPLETED. the others were required to get PP1 to go to completion. - for waiting_spiff_task in bpmn_process_instance.get_tasks( - TaskState.WAITING | TaskState.CANCELLED | TaskState.READY | TaskState.MAYBE | TaskState.LIKELY - ): - _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( - TaskService.find_or_create_task_model_from_spiff_task( - waiting_spiff_task, - self.process_instance, - self.serializer, - bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, - ) - ) - self.task_models.update(new_task_models) - self.json_data_dicts.update(new_json_data_dicts) - json_data_dict_list = TaskService.update_task_model(task_model, waiting_spiff_task, self.serializer) - self.task_models[task_model.guid] = task_model - self._update_json_data_dicts_using_list(json_data_dict_list) + ) + bpmn_process_json_data = TaskService.update_task_data_on_bpmn_process(bpmn_process or task_model.bpmn_process, spiff_task.workflow.data) + self.task_models.update(new_task_models) + self.json_data_dicts.update(new_json_data_dicts) + json_data_dict_list = TaskService.update_task_model(task_model, spiff_task, self.serializer) + self.task_models[task_model.guid] = task_model + if bpmn_process_json_data is not None: + json_data_dict_list.append(bpmn_process_json_data) + self._update_json_data_dicts_using_list(json_data_dict_list) class StepDetailLoggingDelegate(EngineStepDelegate): @@ -203,7 +182,7 @@ class StepDetailLoggingDelegate(EngineStepDelegate): self.spiff_step_details_mapping(spiff_task, self.current_task_start_in_seconds, time.time()) ) - def save(self, commit: bool = True) -> None: + def save(self, _bpmn_process_instance: BpmnWorkflow, commit: bool = True) -> None: db.session.bulk_insert_mappings(SpiffStepDetailsModel, self.step_details) if commit: db.session.commit() @@ -215,18 +194,20 @@ class ExecutionStrategy: def __init__(self, delegate: EngineStepDelegate): """__init__.""" self.delegate = delegate + self.bpmn_process_instance = None def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: pass def save(self) -> None: - self.delegate.save() + self.delegate.save(self.bpmn_process_instance) class GreedyExecutionStrategy(ExecutionStrategy): """The common execution strategy. This will greedily run all engine steps without stopping.""" def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: + self.bpmn_process_instance = bpmn_process_instance bpmn_process_instance.do_engine_steps( exit_at=exit_at, will_complete_task=self.delegate.will_complete_task, @@ -243,6 +224,7 @@ class RunUntilServiceTaskExecutionStrategy(ExecutionStrategy): """ def do_engine_steps(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: + self.bpmn_process_instance = bpmn_process_instance engine_steps = list( [ t @@ -310,6 +292,7 @@ class WorkflowExecutionService: ) try: + # import pdb; pdb.set_trace() self.bpmn_process_instance.refresh_waiting_tasks() # TODO: implicit re-entrant locks here `with_dequeued` diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 086841c08..3d7b729b7 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -1,5 +1,6 @@ """Test Process Api Blueprint.""" import io +from SpiffWorkflow.task import TaskState import json import os import time @@ -2067,7 +2068,7 @@ class TestProcessApi(BaseTest): # TODO: make sure the system notification process is run on exceptions ... - def test_task_data_is_set_even_if_process_instance_errors( + def test_task_data_is_set_even_if_process_instance_errors_through_the_api( self, app: Flask, client: FlaskClient, @@ -2093,6 +2094,7 @@ class TestProcessApi(BaseTest): processor = ProcessInstanceProcessor(process_instance) spiff_task = processor.get_task_by_bpmn_identifier("script_task_two", processor.bpmn_process_instance) assert spiff_task is not None + assert spiff_task.state == TaskState.WAITING assert spiff_task.data == {"my_var": "THE VAR"} def test_process_model_file_create( diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 00aac15e9..bf244a644 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -5,7 +5,8 @@ import pytest from flask import g from flask.app import Flask from flask.testing import FlaskClient -from SpiffWorkflow.task import TaskState # type: ignore +from SpiffWorkflow.task import TaskState +from spiffworkflow_backend.exceptions.api_error import ApiError # type: ignore from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.test_data import load_test_spec @@ -318,6 +319,7 @@ class TestProcessInstanceProcessor(BaseTest): **{"set_in_test_process_to_call_script": 1}, } fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}} + fifth_data_set = {**fourth_data_set, **{'validate_only': False, 'set_top_level_process_script_after_gate': 1}} expected_task_data = { "top_level_script": first_data_set, "manual_task": first_data_set, @@ -345,6 +347,7 @@ class TestProcessInstanceProcessor(BaseTest): assert task_definition.bpmn_identifier == spiff_task_name assert task_definition.bpmn_process_definition.bpmn_identifier == bpmn_process_identifier message = f"{base_failure_message} Expected: {expected_python_env_data}. Received: {task.json_data()}" + # TODO: if we split out env data again we will need to use it here instead of json_data # assert task.python_env_data() == expected_python_env_data, message assert task.json_data() == expected_python_env_data, message spiff_tasks_checked_once.append(spiff_task.task_spec.name) @@ -357,6 +360,8 @@ class TestProcessInstanceProcessor(BaseTest): assert_spiff_task_is_in_process("top_level_subprocess_script", "top_level_subprocess") assert_spiff_task_is_in_process("top_level_script", "top_level_process") + assert processor.get_data() == fifth_data_set + def test_does_not_recreate_human_tasks_on_multiple_saves( self, app: Flask, @@ -469,3 +474,32 @@ class TestProcessInstanceProcessor(BaseTest): # EDIT: when using feature/remove-loop-reset branch of SpiffWorkflow, these should be different. assert human_task_two.task_id != human_task_one.task_id + + def test_task_data_is_set_even_if_process_instance_errors( + self, + app: Flask, + client: FlaskClient, + with_db_and_bpmn_file_cleanup: None, + with_super_admin_user: UserModel, + ) -> None: + """Test_task_data_is_set_even_if_process_instance_errors.""" + process_model = load_test_spec( + process_model_id="group/error_with_task_data", + bpmn_file_name="script_error_with_task_data.bpmn", + process_model_source_directory="error", + ) + process_instance = self.create_process_instance_from_process_model( + process_model=process_model, user=with_super_admin_user + ) + + processor = ProcessInstanceProcessor(process_instance) + with pytest.raises(ApiError): + processor.do_engine_steps(save=True) + + process_instance_final = ProcessInstanceModel.query.filter_by(id=process_instance.id).first() + processor_final = ProcessInstanceProcessor(process_instance_final) + + spiff_task = processor_final.get_task_by_bpmn_identifier("script_task_two", processor_final.bpmn_process_instance) + assert spiff_task is not None + assert spiff_task.state == TaskState.WAITING + assert spiff_task.data == {"my_var": "THE VAR"}