From c4625b8407d366d4de6fd4b4d4cefd9ca6fe7ec7 Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 16 Dec 2022 13:23:00 -0500 Subject: [PATCH] Squashed 'SpiffWorkflow/' changes from ffb16867..841bd630 841bd630 Merge pull request #273 from sartography/bugfix/catch-timers-in-event-gateways 103c70d0 hacks to handle timer events like regular events git-subtree-dir: SpiffWorkflow git-subtree-split: 841bd63017bb1d92858456393f144b4e5b23c994 --- .../bpmn/events/EventBasedGatewayTest.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/tests/SpiffWorkflow/bpmn/events/EventBasedGatewayTest.py b/tests/SpiffWorkflow/bpmn/events/EventBasedGatewayTest.py index 692f4194..6e549784 100644 --- a/tests/SpiffWorkflow/bpmn/events/EventBasedGatewayTest.py +++ b/tests/SpiffWorkflow/bpmn/events/EventBasedGatewayTest.py @@ -19,7 +19,7 @@ class EventBsedGatewayTest(BpmnWorkflowTestCase): def testEventBasedGatewaySaveRestore(self): self.actual_test(True) - + def actual_test(self, save_restore=False): self.workflow.do_engine_steps() @@ -36,6 +36,20 @@ class EventBsedGatewayTest(BpmnWorkflowTestCase): self.assertEqual(self.workflow.get_tasks_from_spec_name('message_2_event')[0].state, TaskState.CANCELLED) self.assertEqual(self.workflow.get_tasks_from_spec_name('timer_event')[0].state, TaskState.CANCELLED) + def testTimeout(self): + + self.workflow.do_engine_steps() + waiting_tasks = self.workflow.get_waiting_tasks() + self.assertEqual(len(waiting_tasks), 1) + timer_event = waiting_tasks[0].task_spec.event_definition.event_definitions[-1] + self.workflow.catch(timer_event) + self.workflow.refresh_waiting_tasks() + self.workflow.do_engine_steps() + self.assertEqual(self.workflow.is_completed(), True) + self.assertEqual(self.workflow.get_tasks_from_spec_name('message_1_event')[0].state, TaskState.CANCELLED) + self.assertEqual(self.workflow.get_tasks_from_spec_name('message_2_event')[0].state, TaskState.CANCELLED) + self.assertEqual(self.workflow.get_tasks_from_spec_name('timer_event')[0].state, TaskState.COMPLETED) + def testMultipleStart(self): spec, subprocess = self.load_workflow_spec('multiple-start-parallel.bpmn', 'main') workflow = BpmnWorkflow(spec)