mirror of
https://github.com/sartography/spiff-arena.git
synced 2025-02-09 16:14:47 +00:00
ffb168675 Option to run tests in parallel (#271) 062eaf15d another hot match -- assure hit policy is correctly passed through. c79ee8407 Quick patch the DMN hit policy to fix a dump mistake. 36dd1b23a Fix ResourceWarning: unclosed file BpmnParser.py:60 (#270) bba7ddf54 Merge pull request #268 from sartography/feature/multiple-event-definition 8cf770985 remove unused import 9d31e035e make multiple throw events work with start events 890c4b921 add throw support for multiple events c1fc55660 add support for catching parallel multiple event definitions 511830b67 add event based gateway 56bd858dc add event type for multiple events git-subtree-dir: SpiffWorkflow git-subtree-split: ffb1686757f944065580dd2db8def73d6c1f0134
47 lines
1.6 KiB
Python
47 lines
1.6 KiB
Python
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
|
|
|
|
from ..BpmnWorkflowTestCase import BpmnWorkflowTestCase
|
|
|
|
|
|
class MultipleThrowEventIntermediateCatchTest(BpmnWorkflowTestCase):
|
|
|
|
def setUp(self):
|
|
self.spec, subprocesses = self.load_collaboration('multiple-throw.bpmn','top')
|
|
self.workflow = BpmnWorkflow(self.spec, subprocesses)
|
|
|
|
def testMultipleThrowEventIntermediateCatch(self):
|
|
self.actual_test()
|
|
|
|
def testMultipleThrowEventIntermediateCatchSaveRestore(self):
|
|
self.actual_test(True)
|
|
|
|
def actual_test(self, save_restore=False):
|
|
if save_restore:
|
|
self.save_restore()
|
|
self.workflow.do_engine_steps()
|
|
self.assertEqual(len(self.workflow.get_waiting_tasks()), 0)
|
|
self.assertEqual(self.workflow.is_completed(), True)
|
|
|
|
|
|
class MultipleThrowEventStartsEventTest(BpmnWorkflowTestCase):
|
|
|
|
def setUp(self):
|
|
specs = self.get_all_specs('multiple-throw-start.bpmn')
|
|
self.spec = specs.pop('initiate')
|
|
self.workflow = BpmnWorkflow(self.spec, specs)
|
|
|
|
def testMultipleThrowEventStartEvent(self):
|
|
self.actual_test()
|
|
|
|
def testMultipleThrowEventStartEventSaveRestore(self):
|
|
self.actual_test(True)
|
|
|
|
def actual_test(self, save_restore=False):
|
|
if save_restore:
|
|
self.save_restore()
|
|
self.workflow.do_engine_steps()
|
|
ready_tasks = self.workflow.get_ready_user_tasks()
|
|
self.assertEqual(len(ready_tasks), 1)
|
|
ready_tasks[0].complete()
|
|
self.workflow.do_engine_steps()
|
|
self.assertEqual(self.workflow.is_completed(), True) |