spiff-arena/tests/SpiffWorkflow/specs/SubWorkflowTest.py
Dan 8bebd09880 Squashed 'SpiffWorkflow/' changes from 2ca6ebf80..7b39b2235
7b39b2235 Merge pull request #300 from sartography/bugfix/remove-minidom-dependency
0642d48b1 remove minidom

git-subtree-dir: SpiffWorkflow
git-subtree-split: 7b39b223562eb510dd68c8d451922721ebb721a7
2023-02-27 14:06:23 -05:00

134 lines
4.6 KiB
Python

# -*- coding: utf-8 -*-
import sys
import unittest
import os
from lxml import etree
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..', '..'))
from SpiffWorkflow.specs.WorkflowSpec import WorkflowSpec
from SpiffWorkflow.specs.SubWorkflow import SubWorkflow
from SpiffWorkflow.serializer.prettyxml import XmlSerializer
from SpiffWorkflow.task import TaskState
from SpiffWorkflow.workflow import Workflow
class TaskSpecTest(unittest.TestCase):
CORRELATE = SubWorkflow
def testConstructor(self):
pass # FIXME
def testSerialize(self):
pass # FIXME
def testTest(self):
pass # FIXME
def load_workflow_spec(self, folder, f):
file = os.path.join(
os.path.dirname(__file__), '..', 'data', 'spiff', folder, f)
serializer = XmlSerializer()
with open(file) as fp:
xml = etree.parse(fp).getroot()
self.wf_spec = WorkflowSpec.deserialize(
serializer, xml, filename=file)
self.workflow = Workflow(self.wf_spec)
def do_next_unique_task(self, name):
# This method asserts that there is only one ready task! The specified
# one - and then completes it
for task in self.workflow.get_tasks(TaskState.WAITING):
task.task_spec._update(task)
ready_tasks = self.workflow.get_tasks(TaskState.READY)
self.assertEqual(1, len(ready_tasks))
task = ready_tasks[0]
self.assertEqual(name, task.task_spec.name)
task.complete()
def do_next_named_step(self, name, other_ready_tasks):
# This method completes a single task from the specified set of ready
# tasks
ready_tasks = self.workflow.get_tasks(TaskState.READY)
all_tasks = sorted([name] + other_ready_tasks)
self.assertEqual(
all_tasks, sorted([t.task_spec.name for t in ready_tasks]))
task = list([t for t in ready_tasks if t.task_spec.name == name])[0]
task.complete()
def test_block_to_subworkflow(self):
self.load_workflow_spec('data', 'block_to_subworkflow.xml')
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
# Inner. The subworkflow task will complete automatically after the subwokflow completes
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
self.do_next_unique_task('last')
self.do_next_unique_task('End')
# Back to outer:
self.do_next_unique_task('last')
self.do_next_unique_task('End')
def test_subworkflow_to_block(self):
self.load_workflow_spec('data', 'subworkflow_to_block.xml')
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
# Inner:
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
self.do_next_unique_task('last')
self.do_next_unique_task('End')
# Back to outer:
self.do_next_unique_task('last')
self.do_next_unique_task('End')
def test_subworkflow_to_join(self):
self.load_workflow_spec('control-flow', 'subworkflow_to_join.xml')
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
# The subworkflow task now sets its child tasks to READY and waits
self.do_next_named_step('second', ['Start'])
# Inner:
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
self.do_next_unique_task('last')
self.do_next_unique_task('End')
# Back to outer:
self.do_next_unique_task('join')
self.do_next_unique_task('last')
self.do_next_unique_task('End')
def test_subworkflow_to_join_refresh_waiting(self):
self.load_workflow_spec('control-flow', 'subworkflow_to_join.xml')
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
self.do_next_named_step('second', ['Start'])
# Inner:
self.do_next_unique_task('Start')
self.do_next_unique_task('first')
# Now refresh waiting tasks:
# Update the state of every WAITING task.
for thetask in self.workflow._get_waiting_tasks():
thetask.task_spec._update(thetask)
self.do_next_unique_task('last')
self.do_next_unique_task('End')
# Back to outer:
self.do_next_unique_task('join')
self.do_next_unique_task('last')
self.do_next_unique_task('End')
def suite():
return unittest.TestLoader().loadTestsFromTestCase(TaskSpecTest)
if __name__ == '__main__':
unittest.TextTestRunner(verbosity=2).run(suite())