140 lines
5.7 KiB
Python
140 lines
5.7 KiB
Python
import json
|
|
import unittest
|
|
|
|
from tests.base_test import BaseTest
|
|
|
|
from crc.services.workflow_processor import WorkflowProcessor
|
|
from crc.services.workflow_service import WorkflowService
|
|
from SpiffWorkflow import Task as SpiffTask, WorkflowException
|
|
from example_data import ExampleDataLoader
|
|
from crc import db
|
|
from crc.models.task_event import TaskEventModel
|
|
from crc.models.api_models import Task
|
|
from crc.api.common import ApiError
|
|
|
|
|
|
class TestWorkflowService(BaseTest):
|
|
|
|
def test_documentation_processing_handles_replacements(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('random_fact')
|
|
processor = WorkflowProcessor(workflow)
|
|
processor.do_engine_steps()
|
|
|
|
task = processor.next_task()
|
|
task.task_spec.documentation = "Some simple docs"
|
|
docs = WorkflowService._process_documentation(task)
|
|
self.assertEqual("Some simple docs", docs)
|
|
|
|
task.data = {"replace_me": "new_thing"}
|
|
task.task_spec.documentation = "{{replace_me}}"
|
|
docs = WorkflowService._process_documentation(task)
|
|
self.assertEqual("new_thing", docs)
|
|
|
|
documentation = """
|
|
# Bigger Test
|
|
|
|
* bullet one
|
|
* bullet two has {{replace_me}}
|
|
|
|
# other stuff.
|
|
"""
|
|
expected = """
|
|
# Bigger Test
|
|
|
|
* bullet one
|
|
* bullet two has new_thing
|
|
|
|
# other stuff.
|
|
"""
|
|
task.task_spec.documentation = documentation
|
|
result = WorkflowService._process_documentation(task)
|
|
self.assertEqual(expected, result)
|
|
|
|
def test_documentation_processing_handles_conditionals(self):
|
|
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('random_fact')
|
|
processor = WorkflowProcessor(workflow)
|
|
processor.do_engine_steps()
|
|
|
|
task = processor.next_task()
|
|
task.task_spec.documentation = "This test {% if works == 'yes' %}works{% endif %}"
|
|
docs = WorkflowService._process_documentation(task)
|
|
self.assertEqual("This test ", docs)
|
|
|
|
task.data = {"works": 'yes'}
|
|
docs = WorkflowService._process_documentation(task)
|
|
self.assertEqual("This test works", docs)
|
|
|
|
def test_enum_options_from_file(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('enum_options_from_file')
|
|
processor = WorkflowProcessor(workflow)
|
|
processor.do_engine_steps()
|
|
task = processor.next_task()
|
|
WorkflowService.process_options(task, task.task_spec.form.fields[0])
|
|
options = task.task_spec.form.fields[0].options
|
|
self.assertEqual(28, len(options))
|
|
self.assertEqual('1000', options[0]['id'])
|
|
self.assertEqual("UVA - INTERNAL - GM USE ONLY", options[0]['name'])
|
|
|
|
def test_random_data_populate_form_on_auto_complete(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('enum_options_with_search')
|
|
processor = WorkflowProcessor(workflow)
|
|
processor.do_engine_steps()
|
|
task = processor.next_task()
|
|
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
|
|
WorkflowService.populate_form_with_random_data(task, task_api, required_only=False)
|
|
self.assertTrue(isinstance(task.data["sponsor"], dict))
|
|
|
|
@unittest.skip("RRT no longer needs to be supported")
|
|
def test_fix_legacy_data_model_for_rrt(self):
|
|
ExampleDataLoader().load_rrt() # Make sure the research_rampup is loaded, as it's not a test spec.
|
|
workflow = self.create_workflow('research_rampup')
|
|
processor = WorkflowProcessor(workflow, validate_only=True)
|
|
|
|
# Use the test spec code to complete the workflow of research rampup.
|
|
while not processor.bpmn_workflow.is_completed():
|
|
processor.bpmn_workflow.do_engine_steps()
|
|
tasks = processor.bpmn_workflow.get_tasks(SpiffTask.READY)
|
|
for task in tasks:
|
|
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
|
|
WorkflowService.populate_form_with_random_data(task, task_api, False)
|
|
task.complete()
|
|
# create the task events
|
|
WorkflowService.log_task_action('dhf8r', processor, task,
|
|
WorkflowService.TASK_ACTION_COMPLETE)
|
|
processor.save()
|
|
db.session.commit()
|
|
|
|
WorkflowService.fix_legacy_data_model_for_rrt()
|
|
|
|
# All tasks should now have data associated with them.
|
|
task_logs = db.session.query(TaskEventModel) \
|
|
.filter(TaskEventModel.workflow_id == workflow.id) \
|
|
.filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE) \
|
|
.order_by(TaskEventModel.date).all() # Get them back in order.
|
|
|
|
self.assertEqual(17, len(task_logs))
|
|
for log in task_logs:
|
|
task = processor.bpmn_workflow.get_tasks_from_spec_name(log.task_name)[0]
|
|
self.assertIsNotNone(log.form_data)
|
|
# Each task should have the data in the form for that task in the task event.
|
|
if hasattr(task.task_spec, 'form'):
|
|
for field in task.task_spec.form.fields:
|
|
if field.has_property(Task.PROP_OPTIONS_REPEAT):
|
|
self.assertIn(field.get_property(Task.PROP_OPTIONS_REPEAT), log.form_data)
|
|
else:
|
|
self.assertIn(field.id, log.form_data)
|
|
|
|
# Some spot checks:
|
|
# The first task should be empty, with all the data removed.
|
|
self.assertEqual({}, task_logs[0].form_data)
|
|
|
|
def test_dmn_evaluation_errors_in_oncomplete_raise_api_errors_during_validation(self):
|
|
workflow_spec_model = self.load_test_spec("decision_table_invalid")
|
|
with self.assertRaises(ApiError):
|
|
WorkflowService.test_spec(workflow_spec_model.id)
|