Bubble up unhandled error and escalation events at the end of the workflow (#2058)

This commit is contained in:
jbirddog 2024-08-26 14:15:19 -04:00 committed by GitHub
parent df58e28120
commit 8949073595
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 37 additions and 10 deletions

View File

@ -1,4 +1,4 @@
# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. # This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand.
[[package]] [[package]]
name = "alembic" name = "alembic"
@ -2986,7 +2986,7 @@ doc = ["sphinx", "sphinx_rtd_theme"]
type = "git" type = "git"
url = "https://github.com/sartography/SpiffWorkflow" url = "https://github.com/sartography/SpiffWorkflow"
reference = "main" reference = "main"
resolved_reference = "14c84aa26d0485753ed8f174d69fb12a888c455a" resolved_reference = "579bac49ea3d3e52d3942c3c4bcda1ae4500285e"
[[package]] [[package]]
name = "spiffworkflow-connector-command" name = "spiffworkflow-connector-command"

View File

@ -62,6 +62,19 @@ class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore
error_line=workflow_task_exception.error_line, error_line=workflow_task_exception.error_line,
) )
@classmethod
def from_completion_with_unhandled_events(
cls,
task: SpiffTask,
unhandled_events: dict[str, list[Any]],
) -> WorkflowExecutionServiceError:
events = {k: [e.event_definition.code for e in v] for k, v in unhandled_events.items()}
return cls(
error_msg=f"The process completed with unhandled events: {events}",
task=task,
)
class ExecutionStrategyNotConfiguredError(Exception): class ExecutionStrategyNotConfiguredError(Exception):
pass pass
@ -533,7 +546,7 @@ class WorkflowExecutionService:
if self.bpmn_process_instance.is_completed(): if self.bpmn_process_instance.is_completed():
self.process_instance_completer(self.bpmn_process_instance) self.process_instance_completer(self.bpmn_process_instance)
self.process_bpmn_messages() self.process_bpmn_events()
self.queue_waiting_receive_messages() self.queue_waiting_receive_messages()
return task_runnability return task_runnability
except WorkflowTaskException as wte: except WorkflowTaskException as wte:
@ -585,14 +598,28 @@ class WorkflowExecutionService:
queued_to_run_at_in_seconds=queued_to_run_at_in_seconds, queued_to_run_at_in_seconds=queued_to_run_at_in_seconds,
) )
def process_bpmn_messages(self) -> None: def group_bpmn_events(self) -> dict[str, Any]:
# FIXE: get_events clears out the events so if we have other events we care about event_groups: dict[str, Any] = {}
# this will clear them out as well. for bpmn_event in self.bpmn_process_instance.get_events():
# Right now we only care about messages though. key = type(bpmn_event.event_definition).__name__
bpmn_events = self.bpmn_process_instance.get_events() if key not in event_groups:
event_groups[key] = []
event_groups[key].append(bpmn_event)
return event_groups
def process_bpmn_events(self) -> None:
bpmn_event_groups = self.group_bpmn_events()
message_events = bpmn_event_groups.pop(MessageEventDefinition.__name__, [])
if bpmn_event_groups and self.bpmn_process_instance.is_completed():
raise WorkflowExecutionServiceError.from_completion_with_unhandled_events(
self.bpmn_process_instance.last_task, bpmn_event_groups
)
self.process_bpmn_messages(message_events)
def process_bpmn_messages(self, bpmn_events: list[MessageEventDefinition]) -> None:
for bpmn_event in bpmn_events: for bpmn_event in bpmn_events:
if not isinstance(bpmn_event.event_definition, MessageEventDefinition):
continue
bpmn_message = bpmn_event.event_definition bpmn_message = bpmn_event.event_definition
message_instance = MessageInstanceModel( message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id, process_instance_id=self.process_instance_model.id,