properly save task and instance exceptions to the db and display in the frontend w/ burnettk
This commit is contained in:
parent
2e1620a519
commit
dc2c1be497
|
@ -205,7 +205,6 @@ class ProcessInstanceApi:
|
|||
next_task: Task | None,
|
||||
process_model_identifier: str,
|
||||
process_model_display_name: str,
|
||||
completed_tasks: int,
|
||||
updated_at_in_seconds: int,
|
||||
) -> None:
|
||||
"""__init__."""
|
||||
|
@ -214,7 +213,6 @@ class ProcessInstanceApi:
|
|||
self.next_task = next_task # The next task that requires user input.
|
||||
self.process_model_identifier = process_model_identifier
|
||||
self.process_model_display_name = process_model_display_name
|
||||
self.completed_tasks = completed_tasks
|
||||
self.updated_at_in_seconds = updated_at_in_seconds
|
||||
|
||||
|
||||
|
@ -231,7 +229,6 @@ class ProcessInstanceApiSchema(Schema):
|
|||
"next_task",
|
||||
"process_model_identifier",
|
||||
"process_model_display_name",
|
||||
"completed_tasks",
|
||||
"updated_at_in_seconds",
|
||||
]
|
||||
unknown = INCLUDE
|
||||
|
@ -248,7 +245,6 @@ class ProcessInstanceApiSchema(Schema):
|
|||
"next_task",
|
||||
"process_model_identifier",
|
||||
"process_model_display_name",
|
||||
"completed_tasks",
|
||||
"updated_at_in_seconds",
|
||||
]
|
||||
filtered_fields = {key: data[key] for key in keys}
|
||||
|
|
|
@ -124,14 +124,12 @@ def process_instance_run(
|
|||
|
||||
processor = None
|
||||
try:
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
processor.do_engine_steps(save=True)
|
||||
processor = ProcessInstanceService.run_process_intance_with_processor(process_instance)
|
||||
except (
|
||||
ApiError,
|
||||
ProcessInstanceIsNotEnqueuedError,
|
||||
ProcessInstanceIsAlreadyLockedError,
|
||||
) as e:
|
||||
# import pdb; pdb.set_trace()
|
||||
ErrorHandlingService.handle_error(process_instance, e)
|
||||
raise e
|
||||
except Exception as e:
|
||||
|
@ -139,7 +137,6 @@ def process_instance_run(
|
|||
# FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
|
||||
# we need to recurse through all last tasks if the last task is a call activity or subprocess.
|
||||
if processor is not None:
|
||||
# import pdb; pdb.set_trace()
|
||||
task = processor.bpmn_process_instance.last_task
|
||||
raise ApiError.from_task(
|
||||
error_code="unknown_exception",
|
||||
|
@ -152,11 +149,17 @@ def process_instance_run(
|
|||
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
||||
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
|
||||
process_instance_data = processor.get_data()
|
||||
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
|
||||
process_instance_metadata["data"] = process_instance_data
|
||||
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
|
||||
# for mypy
|
||||
if processor is not None:
|
||||
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
|
||||
process_instance_data = processor.get_data()
|
||||
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
|
||||
process_instance_metadata["data"] = process_instance_data
|
||||
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
|
||||
|
||||
# FIXME: this should never happen currently but it'd be ideal to always do this
|
||||
# currently though it does not return next task so it cannnot be used to take the user to the next human task
|
||||
return make_response(jsonify(process_instance), 200)
|
||||
|
||||
|
||||
def process_instance_terminate(
|
||||
|
|
|
@ -19,10 +19,10 @@ class ErrorHandlingService:
|
|||
MESSAGE_NAME = "SystemErrorMessage"
|
||||
|
||||
@classmethod
|
||||
def handle_error(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception]) -> None:
|
||||
def handle_error(cls, process_instance: ProcessInstanceModel, error: Exception) -> None:
|
||||
"""On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception."""
|
||||
process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier)
|
||||
cls._update_process_instance_in_database(process_instance, error, process_model.fault_or_suspend_on_exception)
|
||||
cls._update_process_instance_in_database(process_instance, process_model.fault_or_suspend_on_exception)
|
||||
|
||||
# Second, send a bpmn message out, but only if an exception notification address is provided
|
||||
# This will create a new Send Message with correlation keys on the recipients and the message
|
||||
|
@ -35,9 +35,7 @@ class ErrorHandlingService:
|
|||
current_app.logger.error(e)
|
||||
|
||||
@classmethod
|
||||
def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception], fault_or_suspend_on_exception: str) -> None:
|
||||
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=error)
|
||||
|
||||
def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, fault_or_suspend_on_exception: str) -> None:
|
||||
# First, suspend or fault the instance
|
||||
if fault_or_suspend_on_exception == "suspend":
|
||||
cls._set_instance_status(
|
||||
|
@ -55,7 +53,7 @@ class ErrorHandlingService:
|
|||
|
||||
@staticmethod
|
||||
def _handle_system_notification(
|
||||
error: Union[ApiError, Exception],
|
||||
error: Exception,
|
||||
process_model: ProcessModelInfo,
|
||||
process_instance: ProcessInstanceModel,
|
||||
) -> None:
|
||||
|
|
|
@ -108,6 +108,7 @@ from spiffworkflow_backend.services.task_service import JsonDataDict
|
|||
from spiffworkflow_backend.services.task_service import TaskService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
from spiffworkflow_backend.services.workflow_execution_service import (
|
||||
ExecutionStrategyNotConfiguredError,
|
||||
execution_strategy_named,
|
||||
)
|
||||
from spiffworkflow_backend.services.workflow_execution_service import (
|
||||
|
@ -162,9 +163,10 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
|
|||
script: str,
|
||||
context: Dict[str, Any],
|
||||
external_methods: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
) -> bool:
|
||||
super().execute(script, context, external_methods)
|
||||
self._last_result = context
|
||||
return True
|
||||
|
||||
def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||||
return {}
|
||||
|
@ -217,7 +219,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
|||
script: str,
|
||||
context: Dict[str, Any],
|
||||
external_methods: Optional[Dict[str, Any]] = None,
|
||||
) -> None:
|
||||
) -> bool:
|
||||
# TODO: once integrated look at the tests that fail without Box
|
||||
# context is task.data
|
||||
Box.convert_to_box(context)
|
||||
|
@ -239,6 +241,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
|
|||
# the task data needs to be updated with the current state so data references can be resolved properly.
|
||||
# the state will be removed later once the task is completed.
|
||||
context.update(self.state)
|
||||
return True
|
||||
|
||||
def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
||||
keys_to_filter = self.non_user_defined_keys
|
||||
|
@ -318,13 +321,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
|
|||
# This will overwrite the standard builtins
|
||||
default_globals.update(safe_globals)
|
||||
default_globals["__builtins__"]["__import__"] = _import
|
||||
|
||||
environment = CustomScriptEngineEnvironment(default_globals)
|
||||
|
||||
# right now spiff is executing script tasks on ready so doing this
|
||||
# so we know when something fails and we can save it to our database.
|
||||
self.failing_spiff_task: Optional[SpiffTask] = None
|
||||
|
||||
super().__init__(environment=environment)
|
||||
|
||||
def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]:
|
||||
|
@ -351,7 +348,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
|
|||
expression: str,
|
||||
external_methods: Optional[dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
"""Evaluate."""
|
||||
return self._evaluate(expression, task.data, task, external_methods)
|
||||
|
||||
def _evaluate(
|
||||
|
@ -361,7 +357,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
|
|||
task: Optional[SpiffTask] = None,
|
||||
external_methods: Optional[Dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
"""_evaluate."""
|
||||
methods = self.__get_augment_methods(task)
|
||||
if external_methods:
|
||||
methods.update(external_methods)
|
||||
|
@ -381,17 +376,15 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
|
|||
exception=exception,
|
||||
) from exception
|
||||
|
||||
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None:
|
||||
"""Execute."""
|
||||
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> bool:
|
||||
try:
|
||||
# reset failing task just in case
|
||||
self.failing_spiff_task = None
|
||||
methods = self.__get_augment_methods(task)
|
||||
if external_methods:
|
||||
methods.update(external_methods)
|
||||
super().execute(task, script, methods)
|
||||
return True
|
||||
except WorkflowException as e:
|
||||
self.failing_spiff_task = task
|
||||
raise e
|
||||
except Exception as e:
|
||||
raise self.create_task_exec_exception(task, script, e) from e
|
||||
|
@ -402,7 +395,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
|
|||
operation_params: Dict[str, Any],
|
||||
task_data: Dict[str, Any],
|
||||
) -> Any:
|
||||
"""CallService."""
|
||||
return ServiceTaskDelegate.call_connector(operation_name, operation_params, task_data)
|
||||
|
||||
|
||||
|
@ -1124,14 +1116,10 @@ class ProcessInstanceProcessor:
|
|||
"""Saves the current state of this processor to the database."""
|
||||
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
|
||||
|
||||
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
|
||||
user_tasks = list(self.get_all_user_tasks())
|
||||
self.process_instance_model.status = self.get_status().value
|
||||
current_app.logger.debug(
|
||||
f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}"
|
||||
)
|
||||
self.process_instance_model.total_tasks = len(user_tasks)
|
||||
self.process_instance_model.completed_tasks = sum(1 for t in user_tasks if t.state in complete_states)
|
||||
|
||||
if self.process_instance_model.start_in_seconds is None:
|
||||
self.process_instance_model.start_in_seconds = round(time.time())
|
||||
|
@ -1693,6 +1681,8 @@ class ProcessInstanceProcessor:
|
|||
|
||||
if execution_strategy_name is None:
|
||||
execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"]
|
||||
if execution_strategy_name is None:
|
||||
raise ExecutionStrategyNotConfiguredError("SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set")
|
||||
|
||||
execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate)
|
||||
execution_service = WorkflowExecutionService(
|
||||
|
@ -1702,16 +1692,7 @@ class ProcessInstanceProcessor:
|
|||
self._script_engine.environment.finalize_result,
|
||||
self.save,
|
||||
)
|
||||
try:
|
||||
execution_service.run(exit_at, save)
|
||||
finally:
|
||||
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
|
||||
# script engine on a class variable.
|
||||
if (
|
||||
hasattr(self._script_engine, "failing_spiff_task")
|
||||
and self._script_engine.failing_spiff_task is not None
|
||||
):
|
||||
self._script_engine.failing_spiff_task = None
|
||||
execution_service.run(exit_at, save)
|
||||
|
||||
@classmethod
|
||||
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
|
||||
|
@ -1932,7 +1913,6 @@ class ProcessInstanceProcessor:
|
|||
return [t for t in all_tasks if not self.bpmn_process_instance._is_engine_task(t.task_spec)]
|
||||
|
||||
def get_all_completed_tasks(self) -> list[SpiffTask]:
|
||||
"""Get_all_completed_tasks."""
|
||||
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
|
||||
return [
|
||||
t
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import contextlib
|
||||
import time
|
||||
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from spiffworkflow_backend.services.task_service import TaskService
|
||||
from typing import Generator
|
||||
|
@ -99,7 +100,10 @@ class ProcessInstanceQueueService:
|
|||
except Exception as ex:
|
||||
process_instance.status = ProcessInstanceStatus.error.value
|
||||
db.session.add(process_instance)
|
||||
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex)
|
||||
# these events are handled in the WorkflowExecutionService.
|
||||
# that is, we don't need to add error_detail records here, etc.
|
||||
if not isinstance(ex, WorkflowExecutionServiceError):
|
||||
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex)
|
||||
db.session.commit()
|
||||
raise ex
|
||||
finally:
|
||||
|
|
|
@ -113,20 +113,9 @@ class ProcessInstanceService:
|
|||
.all()
|
||||
)
|
||||
for process_instance in records:
|
||||
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
||||
try:
|
||||
current_app.logger.info(f"Processing process_instance {process_instance.id}")
|
||||
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
if cls.can_optimistically_skip(processor, status_value):
|
||||
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
||||
continue
|
||||
|
||||
db.session.refresh(process_instance)
|
||||
if process_instance.status == status_value:
|
||||
execution_strategy_name = current_app.config[
|
||||
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"
|
||||
]
|
||||
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
|
||||
cls.run_process_intance_with_processor(process_instance, status_value=status_value)
|
||||
except ProcessInstanceIsAlreadyLockedError:
|
||||
continue
|
||||
except Exception as e:
|
||||
|
@ -137,6 +126,24 @@ class ProcessInstanceService:
|
|||
)
|
||||
current_app.logger.error(error_message)
|
||||
|
||||
@classmethod
|
||||
def run_process_intance_with_processor(cls, process_instance: ProcessInstanceModel, status_value: Optional[str] = None) -> Optional[ProcessInstanceProcessor]:
|
||||
processor = None
|
||||
with ProcessInstanceQueueService.dequeued(process_instance):
|
||||
processor = ProcessInstanceProcessor(process_instance)
|
||||
if status_value and cls.can_optimistically_skip(processor, status_value):
|
||||
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
|
||||
return None
|
||||
|
||||
db.session.refresh(process_instance)
|
||||
if status_value is None or process_instance.status == status_value:
|
||||
execution_strategy_name = current_app.config[
|
||||
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"
|
||||
]
|
||||
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
|
||||
|
||||
return processor
|
||||
|
||||
@staticmethod
|
||||
def processor_to_process_instance_api(
|
||||
processor: ProcessInstanceProcessor, next_task: None = None
|
||||
|
@ -155,7 +162,6 @@ class ProcessInstanceService:
|
|||
next_task=None,
|
||||
process_model_identifier=processor.process_model_identifier,
|
||||
process_model_display_name=processor.process_model_display_name,
|
||||
completed_tasks=processor.process_instance_model.completed_tasks,
|
||||
updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds,
|
||||
)
|
||||
|
||||
|
|
|
@ -3,6 +3,7 @@ import json
|
|||
from spiffworkflow_backend.exceptions.api_error import ApiError
|
||||
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
|
||||
from flask import g
|
||||
from spiffworkflow_backend.models import process_instance_error_detail
|
||||
from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel
|
||||
import traceback
|
||||
import time
|
||||
|
@ -117,7 +118,6 @@ class TaskService:
|
|||
def update_task_model_with_spiff_task(
|
||||
self,
|
||||
spiff_task: SpiffTask,
|
||||
task_failed: bool = False,
|
||||
start_and_end_times: Optional[StartAndEndTimes] = None,
|
||||
) -> TaskModel:
|
||||
new_bpmn_process = None
|
||||
|
@ -158,20 +158,11 @@ class TaskService:
|
|||
task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
|
||||
task_model.end_in_seconds = start_and_end_times["end_in_seconds"]
|
||||
|
||||
if task_model.state == "COMPLETED" or task_failed:
|
||||
# let failed tasks raise and we will log the event then
|
||||
if task_model.state == "COMPLETED":
|
||||
event_type = ProcessInstanceEventType.task_completed.value
|
||||
if task_failed or task_model.state == TaskState.ERROR:
|
||||
event_type = ProcessInstanceEventType.task_failed.value
|
||||
|
||||
# FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete
|
||||
# which script tasks execute when READY.
|
||||
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
|
||||
process_instance_event = ProcessInstanceEventModel(
|
||||
task_guid=task_model.guid,
|
||||
process_instance_id=self.process_instance.id,
|
||||
event_type=event_type,
|
||||
timestamp=timestamp,
|
||||
)
|
||||
process_instance_event, _process_instance_error_detail = TaskService.add_event_to_process_instance(self.process_instance, event_type, task_guid=task_model.guid, timestamp=timestamp, add_to_db_session=False)
|
||||
self.process_instance_events[task_model.guid] = process_instance_event
|
||||
|
||||
self.update_bpmn_process(spiff_task.workflow, bpmn_process)
|
||||
|
@ -607,16 +598,24 @@ class TaskService:
|
|||
task_guid: Optional[str] = None,
|
||||
user_id: Optional[int] = None,
|
||||
exception: Optional[Exception] = None,
|
||||
) -> None:
|
||||
timestamp: Optional[float] = None,
|
||||
add_to_db_session: Optional[bool] = True,
|
||||
) -> Tuple[ProcessInstanceEventModel, Optional[ProcessInstanceErrorDetailModel]]:
|
||||
if user_id is None and hasattr(g, "user") and g.user:
|
||||
user_id = g.user.id
|
||||
if timestamp is None:
|
||||
timestamp = time.time()
|
||||
|
||||
process_instance_event = ProcessInstanceEventModel(
|
||||
process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id
|
||||
process_instance_id=process_instance.id, event_type=event_type, timestamp=timestamp, user_id=user_id
|
||||
)
|
||||
if task_guid:
|
||||
process_instance_event.task_guid = task_guid
|
||||
db.session.add(process_instance_event)
|
||||
|
||||
if add_to_db_session:
|
||||
db.session.add(process_instance_event)
|
||||
|
||||
process_instance_error_detail = None
|
||||
if exception is not None:
|
||||
# truncate to avoid database errors on large values. We observed that text in mysql is 65K.
|
||||
stacktrace = traceback.format_exc().split("\n")
|
||||
|
@ -641,4 +640,7 @@ class TaskService:
|
|||
task_trace=task_trace,
|
||||
task_offset=task_offset,
|
||||
)
|
||||
db.session.add(process_instance_error_detail)
|
||||
|
||||
if add_to_db_session:
|
||||
db.session.add(process_instance_error_detail)
|
||||
return (process_instance_event, process_instance_error_detail)
|
||||
|
|
|
@ -1,5 +1,9 @@
|
|||
from __future__ import annotations
|
||||
import copy
|
||||
import time
|
||||
from abc import abstractmethod
|
||||
from typing import Type, TypeVar
|
||||
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
|
||||
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
|
||||
from typing import Callable
|
||||
from typing import Optional
|
||||
|
@ -28,21 +32,69 @@ from spiffworkflow_backend.services.task_service import StartAndEndTimes
|
|||
from spiffworkflow_backend.services.task_service import TaskService
|
||||
|
||||
|
||||
|
||||
|
||||
class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore
|
||||
@classmethod
|
||||
def from_workflow_task_exception(
|
||||
cls,
|
||||
workflow_task_exception: WorkflowTaskException,
|
||||
) -> WorkflowExecutionServiceError:
|
||||
return cls(
|
||||
error_msg=str(workflow_task_exception),
|
||||
task=workflow_task_exception.task,
|
||||
exception=workflow_task_exception,
|
||||
line_number=workflow_task_exception.line_number,
|
||||
offset=workflow_task_exception.offset,
|
||||
error_line=workflow_task_exception.error_line,
|
||||
)
|
||||
|
||||
|
||||
class ExecutionStrategyNotConfiguredError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class EngineStepDelegate:
|
||||
"""Interface of sorts for a concrete engine step delegate."""
|
||||
|
||||
@abstractmethod
|
||||
def will_complete_task(self, spiff_task: SpiffTask) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def did_complete_task(self, spiff_task: SpiffTask) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
pass
|
||||
|
||||
|
||||
class ExecutionStrategy:
|
||||
"""Interface of sorts for a concrete execution strategy."""
|
||||
|
||||
def __init__(self, delegate: EngineStepDelegate):
|
||||
"""__init__."""
|
||||
self.delegate = delegate
|
||||
|
||||
@abstractmethod
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
pass
|
||||
|
||||
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
self.delegate.on_exception(bpmn_process_instance)
|
||||
|
||||
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
self.delegate.save(bpmn_process_instance)
|
||||
|
||||
|
||||
class TaskModelSavingDelegate(EngineStepDelegate):
|
||||
"""Engine step delegate that takes care of saving a task model to the database.
|
||||
|
@ -104,29 +156,12 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
|||
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
|
||||
|
||||
def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None:
|
||||
script_engine = bpmn_process_instance.script_engine
|
||||
if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None:
|
||||
failing_spiff_task = script_engine.failing_spiff_task
|
||||
self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True)
|
||||
self.task_service.process_spiff_task_parent_subprocess_tasks(failing_spiff_task)
|
||||
self.task_service.process_spiff_task_children(failing_spiff_task)
|
||||
|
||||
self.task_service.save_objects_to_database()
|
||||
|
||||
if self.secondary_engine_step_delegate:
|
||||
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
|
||||
db.session.commit()
|
||||
|
||||
def _add_children(self, spiff_task: SpiffTask) -> None:
|
||||
for child_spiff_task in spiff_task.children:
|
||||
self.spiff_tasks_to_process.add(child_spiff_task.id)
|
||||
self._add_children(child_spiff_task)
|
||||
|
||||
def _add_parents(self, spiff_task: SpiffTask) -> None:
|
||||
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
|
||||
self.spiff_tasks_to_process.add(spiff_task.parent.id)
|
||||
self._add_parents(spiff_task.parent)
|
||||
|
||||
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
if self._should_update_task_model():
|
||||
# NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace
|
||||
|
@ -187,6 +222,19 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
|||
# self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
|
||||
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
|
||||
|
||||
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
self.after_engine_steps(bpmn_process_instance)
|
||||
|
||||
def _add_children(self, spiff_task: SpiffTask) -> None:
|
||||
for child_spiff_task in spiff_task.children:
|
||||
self.spiff_tasks_to_process.add(child_spiff_task.id)
|
||||
self._add_children(child_spiff_task)
|
||||
|
||||
def _add_parents(self, spiff_task: SpiffTask) -> None:
|
||||
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
|
||||
self.spiff_tasks_to_process.add(spiff_task.parent.id)
|
||||
self._add_parents(spiff_task.parent)
|
||||
|
||||
def _should_update_task_model(self) -> bool:
|
||||
"""We need to figure out if we have previously save task info on this process intance.
|
||||
|
||||
|
@ -196,20 +244,6 @@ class TaskModelSavingDelegate(EngineStepDelegate):
|
|||
return True
|
||||
|
||||
|
||||
class ExecutionStrategy:
|
||||
"""Interface of sorts for a concrete execution strategy."""
|
||||
|
||||
def __init__(self, delegate: EngineStepDelegate):
|
||||
"""__init__."""
|
||||
self.delegate = delegate
|
||||
|
||||
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
|
||||
pass
|
||||
|
||||
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
|
||||
self.delegate.save(bpmn_process_instance)
|
||||
|
||||
|
||||
class GreedyExecutionStrategy(ExecutionStrategy):
|
||||
"""The common execution strategy. This will greedily run all engine steps without stopping."""
|
||||
|
||||
|
@ -329,8 +363,12 @@ class WorkflowExecutionService:
|
|||
|
||||
self.process_bpmn_messages()
|
||||
self.queue_waiting_receive_messages()
|
||||
except WorkflowTaskException as wte:
|
||||
TaskService.add_event_to_process_instance(self.process_instance_model, ProcessInstanceEventType.task_failed.value, exception=wte, task_guid=str(wte.task.id))
|
||||
self.execution_strategy.on_exception(self.bpmn_process_instance)
|
||||
raise WorkflowExecutionServiceError.from_workflow_task_exception(wte)
|
||||
except SpiffWorkflowException as swe:
|
||||
TaskService.add_event_to_process_instance(self.process_instance_model, ProcessInstanceEventType.task_failed.value, exception=swe, task_guid=str(swe.task.id))
|
||||
self.execution_strategy.on_exception(self.bpmn_process_instance)
|
||||
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
|
||||
|
||||
finally:
|
||||
|
|
|
@ -69,6 +69,7 @@ type OwnProps = {
|
|||
readyOrWaitingProcessInstanceTasks?: Task[] | null;
|
||||
completedProcessInstanceTasks?: Task[] | null;
|
||||
cancelledProcessInstanceTasks?: Task[] | null;
|
||||
erroredProcessInstanceTasks?: Task[] | null;
|
||||
saveDiagram?: (..._args: any[]) => any;
|
||||
onDeleteFile?: (..._args: any[]) => any;
|
||||
isPrimaryFile?: boolean;
|
||||
|
@ -96,6 +97,7 @@ export default function ReactDiagramEditor({
|
|||
readyOrWaitingProcessInstanceTasks,
|
||||
completedProcessInstanceTasks,
|
||||
cancelledProcessInstanceTasks,
|
||||
erroredProcessInstanceTasks,
|
||||
saveDiagram,
|
||||
onDeleteFile,
|
||||
isPrimaryFile,
|
||||
|
@ -457,6 +459,19 @@ export default function ReactDiagramEditor({
|
|||
);
|
||||
});
|
||||
}
|
||||
if (erroredProcessInstanceTasks) {
|
||||
const bpmnProcessIdentifiers = getBpmnProcessIdentifiers(
|
||||
canvas.getRootElement()
|
||||
);
|
||||
erroredProcessInstanceTasks.forEach((erroredTask) => {
|
||||
highlightBpmnIoElement(
|
||||
canvas,
|
||||
erroredTask,
|
||||
'errored-task-highlight',
|
||||
bpmnProcessIdentifiers
|
||||
);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
function displayDiagram(
|
||||
|
|
|
@ -146,6 +146,10 @@ code {
|
|||
fill: blue !important;
|
||||
opacity: .2;
|
||||
}
|
||||
.errored-task-highlight:not(.djs-connection) .djs-visual > :nth-child(1) {
|
||||
fill: red !important;
|
||||
opacity: .2;
|
||||
}
|
||||
|
||||
.accordion-item-label {
|
||||
vertical-align: middle;
|
||||
|
|
|
@ -59,6 +59,7 @@ export interface TaskIds {
|
|||
completed: Task[];
|
||||
readyOrWaiting: Task[];
|
||||
cancelled: Task[];
|
||||
errored: Task[];
|
||||
}
|
||||
|
||||
export interface ProcessInstanceTask {
|
||||
|
|
|
@ -235,6 +235,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
|
|||
completed: [],
|
||||
readyOrWaiting: [],
|
||||
cancelled: [],
|
||||
errored: [],
|
||||
};
|
||||
if (tasks) {
|
||||
tasks.forEach(function getUserTasksElement(task: Task) {
|
||||
|
@ -244,6 +245,8 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
|
|||
taskIds.readyOrWaiting.push(task);
|
||||
} else if (task.state === 'CANCELLED') {
|
||||
taskIds.cancelled.push(task);
|
||||
} else if (task.state === 'ERROR') {
|
||||
taskIds.errored.push(task);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
@ -1159,6 +1162,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
|
|||
readyOrWaitingProcessInstanceTasks={taskIds.readyOrWaiting}
|
||||
completedProcessInstanceTasks={taskIds.completed}
|
||||
cancelledProcessInstanceTasks={taskIds.cancelled}
|
||||
erroredProcessInstanceTasks={taskIds.errored}
|
||||
diagramType="readonly"
|
||||
onElementClick={handleClickedDiagramTask}
|
||||
/>
|
||||
|
|
Loading…
Reference in New Issue