properly save task and instance exceptions to the db and display in the frontend w/ burnettk

This commit is contained in:
jasquat 2023-04-20 11:49:34 -04:00
parent 08271da363
commit 1b65a277c4
12 changed files with 164 additions and 113 deletions

View File

@ -205,7 +205,6 @@ class ProcessInstanceApi:
next_task: Task | None,
process_model_identifier: str,
process_model_display_name: str,
completed_tasks: int,
updated_at_in_seconds: int,
) -> None:
"""__init__."""
@ -214,7 +213,6 @@ class ProcessInstanceApi:
self.next_task = next_task # The next task that requires user input.
self.process_model_identifier = process_model_identifier
self.process_model_display_name = process_model_display_name
self.completed_tasks = completed_tasks
self.updated_at_in_seconds = updated_at_in_seconds
@ -231,7 +229,6 @@ class ProcessInstanceApiSchema(Schema):
"next_task",
"process_model_identifier",
"process_model_display_name",
"completed_tasks",
"updated_at_in_seconds",
]
unknown = INCLUDE
@ -248,7 +245,6 @@ class ProcessInstanceApiSchema(Schema):
"next_task",
"process_model_identifier",
"process_model_display_name",
"completed_tasks",
"updated_at_in_seconds",
]
filtered_fields = {key: data[key] for key in keys}

View File

@ -124,14 +124,12 @@ def process_instance_run(
processor = None
try:
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
processor = ProcessInstanceService.run_process_intance_with_processor(process_instance)
except (
ApiError,
ProcessInstanceIsNotEnqueuedError,
ProcessInstanceIsAlreadyLockedError,
) as e:
# import pdb; pdb.set_trace()
ErrorHandlingService.handle_error(process_instance, e)
raise e
except Exception as e:
@ -139,7 +137,6 @@ def process_instance_run(
# FIXME: this is going to point someone to the wrong task - it's misinformation for errors in sub-processes.
# we need to recurse through all last tasks if the last task is a call activity or subprocess.
if processor is not None:
# import pdb; pdb.set_trace()
task = processor.bpmn_process_instance.last_task
raise ApiError.from_task(
error_code="unknown_exception",
@ -152,11 +149,17 @@ def process_instance_run(
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER"]:
MessageService.correlate_all_message_instances()
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
process_instance_data = processor.get_data()
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
process_instance_metadata["data"] = process_instance_data
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
# for mypy
if processor is not None:
process_instance_api = ProcessInstanceService.processor_to_process_instance_api(processor)
process_instance_data = processor.get_data()
process_instance_metadata = ProcessInstanceApiSchema().dump(process_instance_api)
process_instance_metadata["data"] = process_instance_data
return Response(json.dumps(process_instance_metadata), status=200, mimetype="application/json")
# FIXME: this should never happen currently but it'd be ideal to always do this
# currently though it does not return next task so it cannnot be used to take the user to the next human task
return make_response(jsonify(process_instance), 200)
def process_instance_terminate(

View File

@ -19,10 +19,10 @@ class ErrorHandlingService:
MESSAGE_NAME = "SystemErrorMessage"
@classmethod
def handle_error(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception]) -> None:
def handle_error(cls, process_instance: ProcessInstanceModel, error: Exception) -> None:
"""On unhandled exceptions, set instance.status based on model.fault_or_suspend_on_exception."""
process_model = ProcessModelService.get_process_model(process_instance.process_model_identifier)
cls._update_process_instance_in_database(process_instance, error, process_model.fault_or_suspend_on_exception)
cls._update_process_instance_in_database(process_instance, process_model.fault_or_suspend_on_exception)
# Second, send a bpmn message out, but only if an exception notification address is provided
# This will create a new Send Message with correlation keys on the recipients and the message
@ -35,9 +35,7 @@ class ErrorHandlingService:
current_app.logger.error(e)
@classmethod
def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, error: Union[ApiError, Exception], fault_or_suspend_on_exception: str) -> None:
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=error)
def _update_process_instance_in_database(cls, process_instance: ProcessInstanceModel, fault_or_suspend_on_exception: str) -> None:
# First, suspend or fault the instance
if fault_or_suspend_on_exception == "suspend":
cls._set_instance_status(
@ -55,7 +53,7 @@ class ErrorHandlingService:
@staticmethod
def _handle_system_notification(
error: Union[ApiError, Exception],
error: Exception,
process_model: ProcessModelInfo,
process_instance: ProcessInstanceModel,
) -> None:

View File

@ -108,6 +108,7 @@ from spiffworkflow_backend.services.task_service import JsonDataDict
from spiffworkflow_backend.services.task_service import TaskService
from spiffworkflow_backend.services.user_service import UserService
from spiffworkflow_backend.services.workflow_execution_service import (
ExecutionStrategyNotConfiguredError,
execution_strategy_named,
)
from spiffworkflow_backend.services.workflow_execution_service import (
@ -162,9 +163,10 @@ class BoxedTaskDataBasedScriptEngineEnvironment(BoxedTaskDataEnvironment): # ty
script: str,
context: Dict[str, Any],
external_methods: Optional[Dict[str, Any]] = None,
) -> None:
) -> bool:
super().execute(script, context, external_methods)
self._last_result = context
return True
def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
return {}
@ -217,7 +219,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
script: str,
context: Dict[str, Any],
external_methods: Optional[Dict[str, Any]] = None,
) -> None:
) -> bool:
# TODO: once integrated look at the tests that fail without Box
# context is task.data
Box.convert_to_box(context)
@ -239,6 +241,7 @@ class NonTaskDataBasedScriptEngineEnvironment(BasePythonScriptEngineEnvironment)
# the task data needs to be updated with the current state so data references can be resolved properly.
# the state will be removed later once the task is completed.
context.update(self.state)
return True
def user_defined_state(self, external_methods: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
keys_to_filter = self.non_user_defined_keys
@ -318,13 +321,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
# This will overwrite the standard builtins
default_globals.update(safe_globals)
default_globals["__builtins__"]["__import__"] = _import
environment = CustomScriptEngineEnvironment(default_globals)
# right now spiff is executing script tasks on ready so doing this
# so we know when something fails and we can save it to our database.
self.failing_spiff_task: Optional[SpiffTask] = None
super().__init__(environment=environment)
def __get_augment_methods(self, task: Optional[SpiffTask]) -> Dict[str, Callable]:
@ -351,7 +348,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
expression: str,
external_methods: Optional[dict[str, Any]] = None,
) -> Any:
"""Evaluate."""
return self._evaluate(expression, task.data, task, external_methods)
def _evaluate(
@ -361,7 +357,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
task: Optional[SpiffTask] = None,
external_methods: Optional[Dict[str, Any]] = None,
) -> Any:
"""_evaluate."""
methods = self.__get_augment_methods(task)
if external_methods:
methods.update(external_methods)
@ -381,17 +376,15 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
exception=exception,
) from exception
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> None:
"""Execute."""
def execute(self, task: SpiffTask, script: str, external_methods: Any = None) -> bool:
try:
# reset failing task just in case
self.failing_spiff_task = None
methods = self.__get_augment_methods(task)
if external_methods:
methods.update(external_methods)
super().execute(task, script, methods)
return True
except WorkflowException as e:
self.failing_spiff_task = task
raise e
except Exception as e:
raise self.create_task_exec_exception(task, script, e) from e
@ -402,7 +395,6 @@ class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
operation_params: Dict[str, Any],
task_data: Dict[str, Any],
) -> Any:
"""CallService."""
return ServiceTaskDelegate.call_connector(operation_name, operation_params, task_data)
@ -1124,14 +1116,10 @@ class ProcessInstanceProcessor:
"""Saves the current state of this processor to the database."""
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
user_tasks = list(self.get_all_user_tasks())
self.process_instance_model.status = self.get_status().value
current_app.logger.debug(
f"the_status: {self.process_instance_model.status} for instance {self.process_instance_model.id}"
)
self.process_instance_model.total_tasks = len(user_tasks)
self.process_instance_model.completed_tasks = sum(1 for t in user_tasks if t.state in complete_states)
if self.process_instance_model.start_in_seconds is None:
self.process_instance_model.start_in_seconds = round(time.time())
@ -1693,6 +1681,8 @@ class ProcessInstanceProcessor:
if execution_strategy_name is None:
execution_strategy_name = current_app.config["SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB"]
if execution_strategy_name is None:
raise ExecutionStrategyNotConfiguredError("SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_WEB has not been set")
execution_strategy = execution_strategy_named(execution_strategy_name, task_model_delegate)
execution_service = WorkflowExecutionService(
@ -1702,16 +1692,7 @@ class ProcessInstanceProcessor:
self._script_engine.environment.finalize_result,
self.save,
)
try:
execution_service.run(exit_at, save)
finally:
# clear out failling spiff tasks here since the ProcessInstanceProcessor creates an instance of the
# script engine on a class variable.
if (
hasattr(self._script_engine, "failing_spiff_task")
and self._script_engine.failing_spiff_task is not None
):
self._script_engine.failing_spiff_task = None
execution_service.run(exit_at, save)
@classmethod
def get_tasks_with_data(cls, bpmn_process_instance: BpmnWorkflow) -> List[SpiffTask]:
@ -1932,7 +1913,6 @@ class ProcessInstanceProcessor:
return [t for t in all_tasks if not self.bpmn_process_instance._is_engine_task(t.task_spec)]
def get_all_completed_tasks(self) -> list[SpiffTask]:
"""Get_all_completed_tasks."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [
t

View File

@ -1,5 +1,6 @@
import contextlib
import time
from spiffworkflow_backend.services.workflow_execution_service import WorkflowExecutionServiceError
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from spiffworkflow_backend.services.task_service import TaskService
from typing import Generator
@ -99,7 +100,10 @@ class ProcessInstanceQueueService:
except Exception as ex:
process_instance.status = ProcessInstanceStatus.error.value
db.session.add(process_instance)
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex)
# these events are handled in the WorkflowExecutionService.
# that is, we don't need to add error_detail records here, etc.
if not isinstance(ex, WorkflowExecutionServiceError):
TaskService.add_event_to_process_instance(process_instance, ProcessInstanceEventType.process_instance_error.value, exception=ex)
db.session.commit()
raise ex
finally:

View File

@ -113,20 +113,9 @@ class ProcessInstanceService:
.all()
)
for process_instance in records:
current_app.logger.info(f"Processing process_instance {process_instance.id}")
try:
current_app.logger.info(f"Processing process_instance {process_instance.id}")
with ProcessInstanceQueueService.dequeued(process_instance):
processor = ProcessInstanceProcessor(process_instance)
if cls.can_optimistically_skip(processor, status_value):
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
continue
db.session.refresh(process_instance)
if process_instance.status == status_value:
execution_strategy_name = current_app.config[
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"
]
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
cls.run_process_intance_with_processor(process_instance, status_value=status_value)
except ProcessInstanceIsAlreadyLockedError:
continue
except Exception as e:
@ -137,6 +126,24 @@ class ProcessInstanceService:
)
current_app.logger.error(error_message)
@classmethod
def run_process_intance_with_processor(cls, process_instance: ProcessInstanceModel, status_value: Optional[str] = None) -> Optional[ProcessInstanceProcessor]:
processor = None
with ProcessInstanceQueueService.dequeued(process_instance):
processor = ProcessInstanceProcessor(process_instance)
if status_value and cls.can_optimistically_skip(processor, status_value):
current_app.logger.info(f"Optimistically skipped process_instance {process_instance.id}")
return None
db.session.refresh(process_instance)
if status_value is None or process_instance.status == status_value:
execution_strategy_name = current_app.config[
"SPIFFWORKFLOW_BACKEND_ENGINE_STEP_DEFAULT_STRATEGY_BACKGROUND"
]
processor.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
return processor
@staticmethod
def processor_to_process_instance_api(
processor: ProcessInstanceProcessor, next_task: None = None
@ -155,7 +162,6 @@ class ProcessInstanceService:
next_task=None,
process_model_identifier=processor.process_model_identifier,
process_model_display_name=processor.process_model_display_name,
completed_tasks=processor.process_instance_model.completed_tasks,
updated_at_in_seconds=processor.process_instance_model.updated_at_in_seconds,
)

View File

@ -3,6 +3,7 @@ import json
from spiffworkflow_backend.exceptions.api_error import ApiError
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
from flask import g
from spiffworkflow_backend.models import process_instance_error_detail
from spiffworkflow_backend.models.process_instance_error_detail import ProcessInstanceErrorDetailModel
import traceback
import time
@ -117,7 +118,6 @@ class TaskService:
def update_task_model_with_spiff_task(
self,
spiff_task: SpiffTask,
task_failed: bool = False,
start_and_end_times: Optional[StartAndEndTimes] = None,
) -> TaskModel:
new_bpmn_process = None
@ -158,20 +158,11 @@ class TaskService:
task_model.start_in_seconds = start_and_end_times["start_in_seconds"]
task_model.end_in_seconds = start_and_end_times["end_in_seconds"]
if task_model.state == "COMPLETED" or task_failed:
# let failed tasks raise and we will log the event then
if task_model.state == "COMPLETED":
event_type = ProcessInstanceEventType.task_completed.value
if task_failed or task_model.state == TaskState.ERROR:
event_type = ProcessInstanceEventType.task_failed.value
# FIXME: some failed tasks will currently not have either timestamp since we only hook into spiff when tasks complete
# which script tasks execute when READY.
timestamp = task_model.end_in_seconds or task_model.start_in_seconds or time.time()
process_instance_event = ProcessInstanceEventModel(
task_guid=task_model.guid,
process_instance_id=self.process_instance.id,
event_type=event_type,
timestamp=timestamp,
)
process_instance_event, _process_instance_error_detail = TaskService.add_event_to_process_instance(self.process_instance, event_type, task_guid=task_model.guid, timestamp=timestamp, add_to_db_session=False)
self.process_instance_events[task_model.guid] = process_instance_event
self.update_bpmn_process(spiff_task.workflow, bpmn_process)
@ -607,16 +598,24 @@ class TaskService:
task_guid: Optional[str] = None,
user_id: Optional[int] = None,
exception: Optional[Exception] = None,
) -> None:
timestamp: Optional[float] = None,
add_to_db_session: Optional[bool] = True,
) -> Tuple[ProcessInstanceEventModel, Optional[ProcessInstanceErrorDetailModel]]:
if user_id is None and hasattr(g, "user") and g.user:
user_id = g.user.id
if timestamp is None:
timestamp = time.time()
process_instance_event = ProcessInstanceEventModel(
process_instance_id=process_instance.id, event_type=event_type, timestamp=time.time(), user_id=user_id
process_instance_id=process_instance.id, event_type=event_type, timestamp=timestamp, user_id=user_id
)
if task_guid:
process_instance_event.task_guid = task_guid
db.session.add(process_instance_event)
if add_to_db_session:
db.session.add(process_instance_event)
process_instance_error_detail = None
if exception is not None:
# truncate to avoid database errors on large values. We observed that text in mysql is 65K.
stacktrace = traceback.format_exc().split("\n")
@ -641,4 +640,7 @@ class TaskService:
task_trace=task_trace,
task_offset=task_offset,
)
db.session.add(process_instance_error_detail)
if add_to_db_session:
db.session.add(process_instance_error_detail)
return (process_instance_event, process_instance_error_detail)

View File

@ -1,5 +1,9 @@
from __future__ import annotations
import copy
import time
from abc import abstractmethod
from typing import Type, TypeVar
from SpiffWorkflow.exceptions import WorkflowTaskException # type: ignore
from spiffworkflow_backend.models.process_instance_event import ProcessInstanceEventType
from typing import Callable
from typing import Optional
@ -28,21 +32,69 @@ from spiffworkflow_backend.services.task_service import StartAndEndTimes
from spiffworkflow_backend.services.task_service import TaskService
class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore
@classmethod
def from_workflow_task_exception(
cls,
workflow_task_exception: WorkflowTaskException,
) -> WorkflowExecutionServiceError:
return cls(
error_msg=str(workflow_task_exception),
task=workflow_task_exception.task,
exception=workflow_task_exception,
line_number=workflow_task_exception.line_number,
offset=workflow_task_exception.offset,
error_line=workflow_task_exception.error_line,
)
class ExecutionStrategyNotConfiguredError(Exception):
pass
class EngineStepDelegate:
"""Interface of sorts for a concrete engine step delegate."""
@abstractmethod
def will_complete_task(self, spiff_task: SpiffTask) -> None:
pass
@abstractmethod
def did_complete_task(self, spiff_task: SpiffTask) -> None:
pass
@abstractmethod
def save(self, bpmn_process_instance: BpmnWorkflow, commit: bool = False) -> None:
pass
@abstractmethod
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
@abstractmethod
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
pass
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate):
"""__init__."""
self.delegate = delegate
@abstractmethod
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.on_exception(bpmn_process_instance)
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.save(bpmn_process_instance)
class TaskModelSavingDelegate(EngineStepDelegate):
"""Engine step delegate that takes care of saving a task model to the database.
@ -104,29 +156,12 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self.secondary_engine_step_delegate.did_complete_task(spiff_task)
def save(self, bpmn_process_instance: BpmnWorkflow, _commit: bool = True) -> None:
script_engine = bpmn_process_instance.script_engine
if hasattr(script_engine, "failing_spiff_task") and script_engine.failing_spiff_task is not None:
failing_spiff_task = script_engine.failing_spiff_task
self.task_service.update_task_model_with_spiff_task(failing_spiff_task, task_failed=True)
self.task_service.process_spiff_task_parent_subprocess_tasks(failing_spiff_task)
self.task_service.process_spiff_task_children(failing_spiff_task)
self.task_service.save_objects_to_database()
if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.save(bpmn_process_instance, commit=False)
db.session.commit()
def _add_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self.spiff_tasks_to_process.add(child_spiff_task.id)
self._add_children(child_spiff_task)
def _add_parents(self, spiff_task: SpiffTask) -> None:
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
self.spiff_tasks_to_process.add(spiff_task.parent.id)
self._add_parents(spiff_task.parent)
def after_engine_steps(self, bpmn_process_instance: BpmnWorkflow) -> None:
if self._should_update_task_model():
# NOTE: process-all-tasks: All tests pass with this but it's less efficient and would be nice to replace
@ -187,6 +222,19 @@ class TaskModelSavingDelegate(EngineStepDelegate):
# self.task_service.process_spiff_task_children(self.last_completed_spiff_task)
# self.task_service.process_spiff_task_parent_subprocess_tasks(self.last_completed_spiff_task)
def on_exception(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.after_engine_steps(bpmn_process_instance)
def _add_children(self, spiff_task: SpiffTask) -> None:
for child_spiff_task in spiff_task.children:
self.spiff_tasks_to_process.add(child_spiff_task.id)
self._add_children(child_spiff_task)
def _add_parents(self, spiff_task: SpiffTask) -> None:
if spiff_task.parent and spiff_task.parent.task_spec.name != "Root":
self.spiff_tasks_to_process.add(spiff_task.parent.id)
self._add_parents(spiff_task.parent)
def _should_update_task_model(self) -> bool:
"""We need to figure out if we have previously save task info on this process intance.
@ -196,20 +244,6 @@ class TaskModelSavingDelegate(EngineStepDelegate):
return True
class ExecutionStrategy:
"""Interface of sorts for a concrete execution strategy."""
def __init__(self, delegate: EngineStepDelegate):
"""__init__."""
self.delegate = delegate
def spiff_run(self, bpmn_process_instance: BpmnWorkflow, exit_at: None = None) -> None:
pass
def save(self, bpmn_process_instance: BpmnWorkflow) -> None:
self.delegate.save(bpmn_process_instance)
class GreedyExecutionStrategy(ExecutionStrategy):
"""The common execution strategy. This will greedily run all engine steps without stopping."""
@ -329,8 +363,12 @@ class WorkflowExecutionService:
self.process_bpmn_messages()
self.queue_waiting_receive_messages()
except WorkflowTaskException as wte:
TaskService.add_event_to_process_instance(self.process_instance_model, ProcessInstanceEventType.task_failed.value, exception=wte, task_guid=str(wte.task.id))
self.execution_strategy.on_exception(self.bpmn_process_instance)
raise WorkflowExecutionServiceError.from_workflow_task_exception(wte)
except SpiffWorkflowException as swe:
TaskService.add_event_to_process_instance(self.process_instance_model, ProcessInstanceEventType.task_failed.value, exception=swe, task_guid=str(swe.task.id))
self.execution_strategy.on_exception(self.bpmn_process_instance)
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally:

View File

@ -69,6 +69,7 @@ type OwnProps = {
readyOrWaitingProcessInstanceTasks?: Task[] | null;
completedProcessInstanceTasks?: Task[] | null;
cancelledProcessInstanceTasks?: Task[] | null;
erroredProcessInstanceTasks?: Task[] | null;
saveDiagram?: (..._args: any[]) => any;
onDeleteFile?: (..._args: any[]) => any;
isPrimaryFile?: boolean;
@ -96,6 +97,7 @@ export default function ReactDiagramEditor({
readyOrWaitingProcessInstanceTasks,
completedProcessInstanceTasks,
cancelledProcessInstanceTasks,
erroredProcessInstanceTasks,
saveDiagram,
onDeleteFile,
isPrimaryFile,
@ -457,6 +459,19 @@ export default function ReactDiagramEditor({
);
});
}
if (erroredProcessInstanceTasks) {
const bpmnProcessIdentifiers = getBpmnProcessIdentifiers(
canvas.getRootElement()
);
erroredProcessInstanceTasks.forEach((erroredTask) => {
highlightBpmnIoElement(
canvas,
erroredTask,
'errored-task-highlight',
bpmnProcessIdentifiers
);
});
}
}
function displayDiagram(

View File

@ -146,6 +146,10 @@ code {
fill: blue !important;
opacity: .2;
}
.errored-task-highlight:not(.djs-connection) .djs-visual > :nth-child(1) {
fill: red !important;
opacity: .2;
}
.accordion-item-label {
vertical-align: middle;

View File

@ -59,6 +59,7 @@ export interface TaskIds {
completed: Task[];
readyOrWaiting: Task[];
cancelled: Task[];
errored: Task[];
}
export interface ProcessInstanceTask {

View File

@ -235,6 +235,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
completed: [],
readyOrWaiting: [],
cancelled: [],
errored: [],
};
if (tasks) {
tasks.forEach(function getUserTasksElement(task: Task) {
@ -244,6 +245,8 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
taskIds.readyOrWaiting.push(task);
} else if (task.state === 'CANCELLED') {
taskIds.cancelled.push(task);
} else if (task.state === 'ERROR') {
taskIds.errored.push(task);
}
return null;
});
@ -1159,6 +1162,7 @@ export default function ProcessInstanceShow({ variant }: OwnProps) {
readyOrWaitingProcessInstanceTasks={taskIds.readyOrWaiting}
completedProcessInstanceTasks={taskIds.completed}
cancelledProcessInstanceTasks={taskIds.cancelled}
erroredProcessInstanceTasks={taskIds.errored}
diagramType="readonly"
onElementClick={handleClickedDiagramTask}
/>