diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index bafd38151..f3cd51d12 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.1 and should not be changed by hand. [[package]] name = "alembic" @@ -2986,7 +2986,7 @@ doc = ["sphinx", "sphinx_rtd_theme"] type = "git" url = "https://github.com/sartography/SpiffWorkflow" reference = "main" -resolved_reference = "14c84aa26d0485753ed8f174d69fb12a888c455a" +resolved_reference = "579bac49ea3d3e52d3942c3c4bcda1ae4500285e" [[package]] name = "spiffworkflow-connector-command" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index bdc911b6b..3357ed8e3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -62,6 +62,19 @@ class WorkflowExecutionServiceError(WorkflowTaskException): # type: ignore error_line=workflow_task_exception.error_line, ) + @classmethod + def from_completion_with_unhandled_events( + cls, + task: SpiffTask, + unhandled_events: dict[str, list[Any]], + ) -> WorkflowExecutionServiceError: + events = {k: [e.event_definition.code for e in v] for k, v in unhandled_events.items()} + + return cls( + error_msg=f"The process completed with unhandled events: {events}", + task=task, + ) + class ExecutionStrategyNotConfiguredError(Exception): pass @@ -533,7 +546,7 @@ class WorkflowExecutionService: if self.bpmn_process_instance.is_completed(): self.process_instance_completer(self.bpmn_process_instance) - self.process_bpmn_messages() + self.process_bpmn_events() self.queue_waiting_receive_messages() return task_runnability except WorkflowTaskException as wte: @@ -585,14 +598,28 @@ class WorkflowExecutionService: queued_to_run_at_in_seconds=queued_to_run_at_in_seconds, ) - def process_bpmn_messages(self) -> None: - # FIXE: get_events clears out the events so if we have other events we care about - # this will clear them out as well. - # Right now we only care about messages though. - bpmn_events = self.bpmn_process_instance.get_events() + def group_bpmn_events(self) -> dict[str, Any]: + event_groups: dict[str, Any] = {} + for bpmn_event in self.bpmn_process_instance.get_events(): + key = type(bpmn_event.event_definition).__name__ + if key not in event_groups: + event_groups[key] = [] + event_groups[key].append(bpmn_event) + return event_groups + + def process_bpmn_events(self) -> None: + bpmn_event_groups = self.group_bpmn_events() + message_events = bpmn_event_groups.pop(MessageEventDefinition.__name__, []) + + if bpmn_event_groups and self.bpmn_process_instance.is_completed(): + raise WorkflowExecutionServiceError.from_completion_with_unhandled_events( + self.bpmn_process_instance.last_task, bpmn_event_groups + ) + + self.process_bpmn_messages(message_events) + + def process_bpmn_messages(self, bpmn_events: list[MessageEventDefinition]) -> None: for bpmn_event in bpmn_events: - if not isinstance(bpmn_event.event_definition, MessageEventDefinition): - continue bpmn_message = bpmn_event.event_definition message_instance = MessageInstanceModel( process_instance_id=self.process_instance_model.id,