diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index 5b35df3e..009a7486 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -205,7 +205,6 @@ class ProcessInstanceApi: next_task: Task | None, process_model_identifier: str, process_model_display_name: str, - completed_tasks: int, updated_at_in_seconds: int, ) -> None: """__init__.""" @@ -214,7 +213,6 @@ class ProcessInstanceApi: self.next_task = next_task # The next task that requires user input. self.process_model_identifier = process_model_identifier self.process_model_display_name = process_model_display_name - self.completed_tasks = completed_tasks self.updated_at_in_seconds = updated_at_in_seconds @@ -231,7 +229,6 @@ class ProcessInstanceApiSchema(Schema): "next_task", "process_model_identifier", "process_model_display_name", - "completed_tasks", "updated_at_in_seconds", ] unknown = INCLUDE @@ -248,7 +245,6 @@ class ProcessInstanceApiSchema(Schema): "next_task", "process_model_identifier", "process_model_display_name", - "completed_tasks", "updated_at_in_seconds", ] filtered_fields = {key: data[key] for key in keys} diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 58acee74..b8217fa3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -124,14 +124,12 @@ def process_instance_run( processor = None try: - processor = ProcessInstanceProcessor(process_instance) - processor.do_engine_steps(save=True) + processor = ProcessInstanceService.run_process_intance_with_processor(process_instance) except ( ApiError, ProcessInstanceIsNotEnqueuedError, ProcessInstanceIsAlreadyLockedError, ) as e: - # import pdb; pdb.set_trace() ErrorHandlingService.handle_error(process_instance, e) raise e except Exception as e: @@ -139,7 +137,6 @@ def process_instance_run( # FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes. # we need to recurse through all last tasks if the last task is a call activity or subprocess. if processor is not None: - # import pdb; pdb.set_trace() task = processor.bpmn_process_instance.last_task raise ApiError.from_task( error_code="unknown_exception", @@ -152,11 +149,17 @@ def process_instance_run( if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]: MessageService.correlate_all_message_instances() - process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor) - process_instance_data = processor.get_data() - process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api) - process_instance_metadata["data"] = process_instance_data - return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json") + # for mypy + if processor is not None: + process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor) + process_instance_data = processor.get_data() + process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api) + process_instance_metadata["data"] = process_instance_data + return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json") + + # FIXME: this should never happen currently but it'd be ideal to always do this + # currently though it does not return next task so it cannnot be used to take the user to the next human task + return make_response(jsonify(process_instance), 200) def process_instance_terminate( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py index 6696f353..1ebc5202 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py @@ -19,10 +19,10 @@ class ErrorHandlingService: MESSAGE_NAME = "SystemErrorMessage" @classmethod - def handle_error(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception]) -> None: + def handle_error(cls, process_instance: ProcessInstanceModel, error: Exception) -> None: """On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception.""" process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier) - cls._update_process_instance_in_database(process_instance, error, process_model.fault_or_suspend_on_exception) + cls._update_process_instance_in_database(process_instance, process_model.fault_or_suspend_on_exception) # Second, send a bpmn message out, but only if an exception notification address is provided # This will create a new Send Message with correlation keys on the recipients and the message @@ -35,9 +35,7 @@ class ErrorHandlingService: current_app.logger.error(e) @classmethod - def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception], fault_or_suspend_on_exception: str) -> None: - TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=error) - + def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, fault_or_suspend_on_exception: str) -> None: # First, suspend or fault the instance if fault_or_suspend_on_exception == "suspend": cls._set_instance_status( @@ -55,7 +53,7 @@ class ErrorHandlingService: @staticmethod def _handle_system_notification( - error: Union[ApiError, Exception], + error: Exception, process_model: ProcessModelInfo, process_instance: ProcessInstanceModel, ) -> None: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 1e2a42f3..c4961722 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -108,6 +108,7 @@ from spiffworkflow_backend.services.task_service import JsonDataDict from spiffworkflow_backend.services.task_service import TaskService from spiffworkflow_backend.services.user_service import UserService from spiffworkflow_backend.services.workflow_execution_service import ( + ExecutionStrategyNotConfiguredError, execution_strategy_named, ) from spiffworkflow_backend.services.workflow_execution_service import ( @@ -162,9 +163,10 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty script: str, context: Dict[str, Any], external_methods: Optional[Dict[str, Any]] = None, - ) -> None: + ) -> bool: super().execute(script, context, external_methods) self._last_result = context + return True def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: return {} @@ -217,7 +219,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment) script: str, context: Dict[str, Any], external_methods: Optional[Dict[str, Any]] = None, - ) -> None: + ) -> bool: # TODO: once integrated look at the tests that fail without Box # context is task.data Box.convert_to_box(context) @@ -239,6 +241,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment) # the task data needs to be updated with the current state so data references can be resolved properly. # the state will be removed later once the task is completed. context.update(self.state) + return True def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: keys_to_filter = self.non_user_defined_keys @@ -318,13 +321,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore # This will overwrite the standard builtins default_globals.update(safe_globals) default_globals["__builtins__"]["__import__"] = _import - environment = CustomScriptEngineEnvironment(default_globals) - - # right now spiff is executing script tasks on ready so doing this - # so we know when something fails and we can save it to our database. - self.failing_spiff_task: Optional[SpiffTask] = None - super().__init__(environment=environment) def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]: @@ -351,7 +348,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore expression: str, external_methods: Optional[dict[str, Any]] = None, ) -> Any: - """Evaluate.""" return self._evaluate(expression, task.data, task, external_methods) def _evaluate( @@ -361,7 +357,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore task: Optional[SpiffTask] = None, external_methods: Optional[Dict[str, Any]] = None, ) -> Any: - """_evaluate.""" methods = self.__get_augment_methods(task) if external_methods: methods.update(external_methods) @@ -381,17 +376,15 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore exception=exception, ) from exception - def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None: - """Execute.""" + def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> bool: try: # reset failing task just in case - self.failing_spiff_task = None methods = self.__get_augment_methods(task) if external_methods: methods.update(external_methods) super().execute(task, script, methods) + return True except WorkflowException as e: - self.failing_spiff_task = task raise e except Exception as e: raise self.create_task_exec_exception(task, script, e) from e @@ -402,7 +395,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore operation_params: Dict[str, Any], task_data: Dict[str, Any], ) -> Any: - """CallService.""" return ServiceTaskDelegate.call_connector(operation_name, operation_params, task_data) @@ -1124,14 +1116,10 @@ class ProcessInstanceProcessor: """Saves the current state of this processor to the database.""" self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION - complete_states = [TaskState.CANCELLED, TaskState.COMPLETED] - user_tasks = list(self.get_all_user_tasks()) self.process_instance_model.status = self.get_status().value current_app.logger.debug( f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}" ) - self.process_instance_model.total_tasks = len(user_tasks) - self.process_instance_model.completed_tasks = sum(1 for t in user_tasks if t.state in complete_states) if self.process_instance_model.start_in_seconds is None: self.process_instance_model.start_in_seconds = round(time.time()) @@ -1693,6 +1681,8 @@ class ProcessInstanceProcessor: if execution_strategy_name is None: execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"] + if execution_strategy_name is None: + raise ExecutionStrategyNotConfiguredError("SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set") execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate) execution_service = WorkflowExecutionService( @@ -1702,16 +1692,7 @@ class ProcessInstanceProcessor: self._script_engine.environment.finalize_result, self.save, ) - try: - execution_service.run(exit_at, save) - finally: - # clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the - # script engine on a class variable. - if ( - hasattr(self._script_engine, "failing_spiff_task") - and self._script_engine.failing_spiff_task is not None - ): - self._script_engine.failing_spiff_task = None + execution_service.run(exit_at, save) @classmethod def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]: @@ -1932,7 +1913,6 @@ class ProcessInstanceProcessor: return [t for t in all_tasks if not self.bpmn_process_instance._is_engine_task(t.task_spec)] def get_all_completed_tasks(self) -> list[SpiffTask]: - """Get_all_completed_tasks.""" all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) return [ t diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py index 59915eb1..587b6e80 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_queue_service.py @@ -1,5 +1,6 @@ import contextlib import time +from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from spiffworkflow_backend.services.task_service import TaskService from typing import Generator @@ -99,7 +100,10 @@ class ProcessInstanceQueueService: except Exception as ex: process_instance.status = ProcessInstanceStatus.error.value db.session.add(process_instance) - TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex) + # these events are handled in the WorkflowExecutionService. + # that is, we don't need to add error_detail records here, etc. + if not isinstance(ex, WorkflowExecutionServiceError): + TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex) db.session.commit() raise ex finally: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py index c31bb447..aba946c7 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_service.py @@ -113,20 +113,9 @@ class ProcessInstanceService: .all() ) for process_instance in records: + current_app.logger.info(f"Processing process_instance {process_instance.id}") try: - current_app.logger.info(f"Processing process_instance {process_instance.id}") - with ProcessInstanceQueueService.dequeued(process_instance): - processor = ProcessInstanceProcessor(process_instance) - if cls.can_optimistically_skip(processor, status_value): - current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") - continue - - db.session.refresh(process_instance) - if process_instance.status == status_value: - execution_strategy_name = current_app.config[ - "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND" - ] - processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + cls.run_process_intance_with_processor(process_instance, status_value=status_value) except ProcessInstanceIsAlreadyLockedError: continue except Exception as e: @@ -137,6 +126,24 @@ class ProcessInstanceService: ) current_app.logger.error(error_message) + @classmethod + def run_process_intance_with_processor(cls, process_instance: ProcessInstanceModel, status_value: Optional[str] = None) -> Optional[ProcessInstanceProcessor]: + processor = None + with ProcessInstanceQueueService.dequeued(process_instance): + processor = ProcessInstanceProcessor(process_instance) + if status_value and cls.can_optimistically_skip(processor, status_value): + current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}") + return None + + db.session.refresh(process_instance) + if status_value is None or process_instance.status == status_value: + execution_strategy_name = current_app.config[ + "SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND" + ] + processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + + return processor + @staticmethod def processor_to_process_instance_api( processor: ProcessInstanceProcessor, next_task: None = None @@ -155,7 +162,6 @@ class ProcessInstanceService: next_task=None, process_model_identifier=processor.process_model_identifier, process_model_display_name=processor.process_model_display_name, - completed_tasks=processor.process_instance_model.completed_tasks, updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds, ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 73baa738..8e5a95cd 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -3,6 +3,7 @@ import json from spiffworkflow_backend.exceptions.api_error import ApiError from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore from flask import g +from spiffworkflow_backend.models import process_instance_error_detail from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel import traceback import time @@ -117,7 +118,6 @@ class TaskService: def update_task_model_with_spiff_task( self, spiff_task: SpiffTask, - task_failed: bool = False, start_and_end_times: Optional[StartAndEndTimes] = None, ) -> TaskModel: new_bpmn_process = None @@ -158,20 +158,11 @@ class TaskService: task_model.start_in_seconds = start_and_end_times["start_in_seconds"] task_model.end_in_seconds = start_and_end_times["end_in_seconds"] - if task_model.state == "COMPLETED" or task_failed: + # let failed tasks raise and we will log the event then + if task_model.state == "COMPLETED": event_type = ProcessInstanceEventType.task_completed.value - if task_failed or task_model.state == TaskState.ERROR: - event_type = ProcessInstanceEventType.task_failed.value - - # FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete - # which script tasks execute when READY. timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time() - process_instance_event = ProcessInstanceEventModel( - task_guid=task_model.guid, - process_instance_id=self.process_instance.id, - event_type=event_type, - timestamp=timestamp, - ) + process_instance_event, _process_instance_error_detail = TaskService.add_event_to_process_instance(self.process_instance, event_type, task_guid=task_model.guid, timestamp=timestamp, add_to_db_session=False) self.process_instance_events[task_model.guid] = process_instance_event self.update_bpmn_process(spiff_task.workflow, bpmn_process) @@ -607,16 +598,24 @@ class TaskService: task_guid: Optional[str] = None, user_id: Optional[int] = None, exception: Optional[Exception] = None, - ) -> None: + timestamp: Optional[float] = None, + add_to_db_session: Optional[bool] = True, + ) -> Tuple[ProcessInstanceEventModel, Optional[ProcessInstanceErrorDetailModel]]: if user_id is None and hasattr(g, "user") and g.user: user_id = g.user.id + if timestamp is None: + timestamp = time.time() + process_instance_event = ProcessInstanceEventModel( - process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id + process_instance_id=process_instance.id, event_type=event_type, timestamp=timestamp, user_id=user_id ) if task_guid: process_instance_event.task_guid = task_guid - db.session.add(process_instance_event) + if add_to_db_session: + db.session.add(process_instance_event) + + process_instance_error_detail = None if exception is not None: # truncate to avoid database errors on large values. We observed that text in mysql is 65K. stacktrace = traceback.format_exc().split("\n") @@ -641,4 +640,7 @@ class TaskService: task_trace=task_trace, task_offset=task_offset, ) - db.session.add(process_instance_error_detail) + + if add_to_db_session: + db.session.add(process_instance_error_detail) + return (process_instance_event, process_instance_error_detail) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 8ccd5456..cabc6735 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -1,5 +1,9 @@ +from __future__ import annotations import copy import time +from abc import abstractmethod +from typing import Type, TypeVar +from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType from typing import Callable from typing import Optional @@ -28,21 +32,69 @@ from spiffworkflow_backend.services.task_service import StartAndEndTimes from spiffworkflow_backend.services.task_service import TaskService + + +class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore + @classmethod + def from_workflow_task_exception( + cls, + workflow_task_exception: WorkflowTaskException, + ) -> WorkflowExecutionServiceError: + return cls( + error_msg=str(workflow_task_exception), + task=workflow_task_exception.task, + exception=workflow_task_exception, + line_number=workflow_task_exception.line_number, + offset=workflow_task_exception.offset, + error_line=workflow_task_exception.error_line, + ) + + +class ExecutionStrategyNotConfiguredError(Exception): + pass + + class EngineStepDelegate: """Interface of sorts for a concrete engine step delegate.""" + @abstractmethod def will_complete_task(self, spiff_task: SpiffTask) -> None: pass + @abstractmethod def did_complete_task(self, spiff_task: SpiffTask) -> None: pass + @abstractmethod def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None: pass + @abstractmethod def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: pass + @abstractmethod + def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: + pass + + +class ExecutionStrategy: + """Interface of sorts for a concrete execution strategy.""" + + def __init__(self, delegate: EngineStepDelegate): + """__init__.""" + self.delegate = delegate + + @abstractmethod + def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: + pass + + def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.delegate.on_exception(bpmn_process_instance) + + def save(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.delegate.save(bpmn_process_instance) + class TaskModelSavingDelegate(EngineStepDelegate): """Engine step delegate that takes care of saving a task model to the database. @@ -104,29 +156,12 @@ class TaskModelSavingDelegate(EngineStepDelegate): self.secondary_engine_step_delegate.did_complete_task(spiff_task) def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None: - script_engine = bpmn_process_instance.script_engine - if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None: - failing_spiff_task = script_engine.failing_spiff_task - self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True) - self.task_service.process_spiff_task_parent_subprocess_tasks(failing_spiff_task) - self.task_service.process_spiff_task_children(failing_spiff_task) - self.task_service.save_objects_to_database() if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False) db.session.commit() - def _add_children(self, spiff_task: SpiffTask) -> None: - for child_spiff_task in spiff_task.children: - self.spiff_tasks_to_process.add(child_spiff_task.id) - self._add_children(child_spiff_task) - - def _add_parents(self, spiff_task: SpiffTask) -> None: - if spiff_task.parent and spiff_task.parent.task_spec.name != "Root": - self.spiff_tasks_to_process.add(spiff_task.parent.id) - self._add_parents(spiff_task.parent) - def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None: if self._should_update_task_model(): # NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace @@ -187,6 +222,19 @@ class TaskModelSavingDelegate(EngineStepDelegate): # self.task_service.process_spiff_task_children(self.last_completed_spiff_task) # self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task) + def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None: + self.after_engine_steps(bpmn_process_instance) + + def _add_children(self, spiff_task: SpiffTask) -> None: + for child_spiff_task in spiff_task.children: + self.spiff_tasks_to_process.add(child_spiff_task.id) + self._add_children(child_spiff_task) + + def _add_parents(self, spiff_task: SpiffTask) -> None: + if spiff_task.parent and spiff_task.parent.task_spec.name != "Root": + self.spiff_tasks_to_process.add(spiff_task.parent.id) + self._add_parents(spiff_task.parent) + def _should_update_task_model(self) -> bool: """We need to figure out if we have previously save task info on this process intance. @@ -196,20 +244,6 @@ class TaskModelSavingDelegate(EngineStepDelegate): return True -class ExecutionStrategy: - """Interface of sorts for a concrete execution strategy.""" - - def __init__(self, delegate: EngineStepDelegate): - """__init__.""" - self.delegate = delegate - - def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None: - pass - - def save(self, bpmn_process_instance: BpmnWorkflow) -> None: - self.delegate.save(bpmn_process_instance) - - class GreedyExecutionStrategy(ExecutionStrategy): """The common execution strategy. This will greedily run all engine steps without stopping.""" @@ -329,8 +363,12 @@ class WorkflowExecutionService: self.process_bpmn_messages() self.queue_waiting_receive_messages() + except WorkflowTaskException as wte: + TaskService.add_event_to_process_instance(self.process_instance_model, ProcessInstanceEventType.task_failed.value, exception=wte, task_guid=str(wte.task.id)) + self.execution_strategy.on_exception(self.bpmn_process_instance) + raise WorkflowExecutionServiceError.from_workflow_task_exception(wte) except SpiffWorkflowException as swe: - TaskService.add_event_to_process_instance(self.process_instance_model, ProcessInstanceEventType.task_failed.value, exception=swe, task_guid=str(swe.task.id)) + self.execution_strategy.on_exception(self.bpmn_process_instance) raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe finally: diff --git a/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx b/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx index 9b9307f8..6cf486ed 100644 --- a/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx +++ b/spiffworkflow-frontend/src/components/ReactDiagramEditor.tsx @@ -69,6 +69,7 @@ type OwnProps = { readyOrWaitingProcessInstanceTasks?: Task[] | null; completedProcessInstanceTasks?: Task[] | null; cancelledProcessInstanceTasks?: Task[] | null; + erroredProcessInstanceTasks?: Task[] | null; saveDiagram?: (..._args: any[]) => any; onDeleteFile?: (..._args: any[]) => any; isPrimaryFile?: boolean; @@ -96,6 +97,7 @@ export default function ReactDiagramEditor({ readyOrWaitingProcessInstanceTasks, completedProcessInstanceTasks, cancelledProcessInstanceTasks, + erroredProcessInstanceTasks, saveDiagram, onDeleteFile, isPrimaryFile, @@ -457,6 +459,19 @@ export default function ReactDiagramEditor({ ); }); } + if (erroredProcessInstanceTasks) { + const bpmnProcessIdentifiers = getBpmnProcessIdentifiers( + canvas.getRootElement() + ); + erroredProcessInstanceTasks.forEach((erroredTask) => { + highlightBpmnIoElement( + canvas, + erroredTask, + 'errored-task-highlight', + bpmnProcessIdentifiers + ); + }); + } } function displayDiagram( diff --git a/spiffworkflow-frontend/src/index.css b/spiffworkflow-frontend/src/index.css index d98187cd..78c034e5 100644 --- a/spiffworkflow-frontend/src/index.css +++ b/spiffworkflow-frontend/src/index.css @@ -146,6 +146,10 @@ code { fill: blue !important; opacity: .2; } +.errored-task-highlight:not(.djs-connection) .djs-visual > :nth-child(1) { + fill: red !important; + opacity: .2; +} .accordion-item-label { vertical-align: middle; diff --git a/spiffworkflow-frontend/src/interfaces.ts b/spiffworkflow-frontend/src/interfaces.ts index f0b2f48c..022469e6 100644 --- a/spiffworkflow-frontend/src/interfaces.ts +++ b/spiffworkflow-frontend/src/interfaces.ts @@ -59,6 +59,7 @@ export interface TaskIds { completed: Task[]; readyOrWaiting: Task[]; cancelled: Task[]; + errored: Task[]; } export interface ProcessInstanceTask { diff --git a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx index c294ae48..263631ea 100644 --- a/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx +++ b/spiffworkflow-frontend/src/routes/ProcessInstanceShow.tsx @@ -235,6 +235,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { completed: [], readyOrWaiting: [], cancelled: [], + errored: [], }; if (tasks) { tasks.forEach(function getUserTasksElement(task: Task) { @@ -244,6 +245,8 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { taskIds.readyOrWaiting.push(task); } else if (task.state === 'CANCELLED') { taskIds.cancelled.push(task); + } else if (task.state === 'ERROR') { + taskIds.errored.push(task); } return null; }); @@ -1159,6 +1162,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) { readyOrWaitingProcessInstanceTasks={taskIds.readyOrWaiting} completedProcessInstanceTasks={taskIds.completed} cancelledProcessInstanceTasks={taskIds.cancelled} + erroredProcessInstanceTasks={taskIds.errored} diagramType="readonly" onElementClick={handleClickedDiagramTask} />