spiff-arena/tests/SpiffWorkflow/bpmn/events/CallActivityEscalationTest.py
jasquat 69e814758e Squashed 'SpiffWorkflow/' changes from 12f81480a..d27519a36
d27519a36 Merge pull request #259 from sartography/bugfix/spiff-postscript-execution
21aa8a12c update execution order for postscripts
d83fd3d81 Merge pull request #256 from sartography/feature/xml-validation
8303aaab5 uping the sleep time in a test slightly to see if we can get this test to pass consistently in CI.
1d251d55d determine whether to validate by passing in a validator instead of a parameter
2d3daad2d add spiff schema
f8c65dc60 Minor changes to BPMN diagrams to assure all tests are run against valid BPMN Diagrams. Changes required:
9e06b25bf add DMN validation
1b7cbeba0 set parser to validate by default
53fdbba52 add schemas & validation option
a212d9c5d general cleanup

git-subtree-dir: SpiffWorkflow
git-subtree-split: d27519a3631b9772094e5f24dba2f478b0c47135
2022-10-27 10:50:48 -04:00

154 lines
7.0 KiB
Python

# -*- coding: utf-8 -*-
import unittest
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow
from tests.SpiffWorkflow.bpmn.BpmnWorkflowTestCase import BpmnWorkflowTestCase
__author__ = 'kbogus@gmail.com'
def on_reached_cb(workflow, task, completed_set):
# In workflows that load a subworkflow, the newly loaded children
# will not have on_reached_cb() assigned. By using this function, we
# re-assign the function in every step, thus making sure that new
# children also call on_reached_cb().
for child in task.children:
track_task(child.task_spec, completed_set)
return True
def on_complete_cb(workflow, task, completed_set):
completed_set.add(task.task_spec.name)
return True
def track_task(task_spec, completed_set):
if task_spec.reached_event.is_connected(on_reached_cb):
task_spec.reached_event.disconnect(on_reached_cb)
task_spec.reached_event.connect(on_reached_cb, completed_set)
if task_spec.completed_event.is_connected(on_complete_cb):
task_spec.completed_event.disconnect(on_complete_cb)
task_spec.completed_event.connect(on_complete_cb, completed_set)
def track_workflow(wf_spec, completed_set):
for name in wf_spec.task_specs:
track_task(wf_spec.task_specs[name], completed_set)
class CallActivityEscalationTest(BpmnWorkflowTestCase):
def setUp(self):
self.spec, subprocesses = self.load_workflow_spec('Test-Workflows/*.bpmn20.xml', 'CallActivity-Escalation-Test', False)
self.workflow = BpmnWorkflow(self.spec, subprocesses)
def testShouldEscalate(self):
completed_set = set()
track_workflow(self.spec, completed_set)
for task in self.workflow.get_tasks(TaskState.READY):
task.set_data(should_escalate=True)
self.workflow.do_engine_steps()
self.save_restore()
self.workflow.complete_all()
self.assertEqual(True, self.workflow.is_completed())
self.assertEqual(True, 'EndEvent_specific1_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific1_noninterrupting_escalated' in completed_set)
self.assertEqual(True, 'EndEvent_specific1_interrupting_normal' not in completed_set)
self.assertEqual(True, 'EndEvent_specific1_interrupting_escalated' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_escalated' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_missingvariable' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_normal' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_escalated' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_missingvariable' not in completed_set)
self.assertEqual(True, 'EndEvent_general_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_general_noninterrupting_escalated' in completed_set)
self.assertEqual(True, 'EndEvent_general_interrupting_normal' not in completed_set)
self.assertEqual(True, 'EndEvent_general_interrupting_escalated' in completed_set)
def testShouldNotEscalate(self):
completed_set = set()
track_workflow(self.spec, completed_set)
for task in self.workflow.get_tasks(TaskState.READY):
task.set_data(should_escalate=False)
self.workflow.do_engine_steps()
self.save_restore()
self.workflow.complete_all()
self.assertEqual(True, self.workflow.is_completed())
self.assertEqual(True, 'EndEvent_specific1_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific1_noninterrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific1_interrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific1_interrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_missingvariable' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_missingvariable' not in completed_set)
self.assertEqual(True, 'EndEvent_general_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_general_noninterrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_general_interrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_general_interrupting_escalated' not in completed_set)
def testMissingVariable(self):
completed_set = set()
track_workflow(self.spec, completed_set)
self.workflow.do_engine_steps()
self.save_restore()
self.workflow.complete_all()
self.assertEqual(True, self.workflow.is_completed())
self.assertEqual(True, 'EndEvent_specific1_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific1_noninterrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific1_interrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific1_interrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_noninterrupting_missingvariable' in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_normal' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_escalated' not in completed_set)
self.assertEqual(True, 'EndEvent_specific2_interrupting_missingvariable' in completed_set)
self.assertEqual(True, 'EndEvent_general_noninterrupting_normal' in completed_set)
self.assertEqual(True, 'EndEvent_general_noninterrupting_escalated' in completed_set)
self.assertEqual(True, 'EndEvent_general_interrupting_normal' not in completed_set)
self.assertEqual(True, 'EndEvent_general_interrupting_escalated' in completed_set)
class CallActivityEscalationWithoutSaveRestoreTest(CallActivityEscalationTest):
def save_restore(self):
pass # disabling save_restore for this test case
def suite():
loader = unittest.TestLoader()
return unittest.TestSuite([
loader.loadTestsFromTestCase(cls)
for cls in [
CallActivityEscalationTest,
CallActivityEscalationWithoutSaveRestoreTest,
]
])
if __name__ == '__main__':
unittest.TextTestRunner(verbosity=2).run(suite())