2020-06-17 17:11:15 -04:00
|
|
|
import json
|
2020-07-17 11:51:21 -04:00
|
|
|
import unittest
|
2020-06-17 17:11:15 -04:00
|
|
|
|
2020-05-25 15:30:06 -04:00
|
|
|
from tests.base_test import BaseTest
|
2020-05-29 01:39:39 -04:00
|
|
|
|
2020-04-15 11:13:32 -04:00
|
|
|
from crc.services.workflow_processor import WorkflowProcessor
|
|
|
|
from crc.services.workflow_service import WorkflowService
|
2020-06-17 17:11:15 -04:00
|
|
|
from SpiffWorkflow import Task as SpiffTask, WorkflowException
|
|
|
|
from example_data import ExampleDataLoader
|
|
|
|
from crc import db
|
2020-07-14 22:16:44 -04:00
|
|
|
from crc.models.task_event import TaskEventModel
|
2020-06-17 17:11:15 -04:00
|
|
|
from crc.models.api_models import Task
|
2020-07-15 11:16:35 -04:00
|
|
|
from crc.api.common import ApiError
|
2020-04-15 11:13:32 -04:00
|
|
|
|
|
|
|
|
|
|
|
class TestWorkflowService(BaseTest):
|
|
|
|
|
|
|
|
def test_documentation_processing_handles_replacements(self):
|
2020-04-17 13:30:32 -04:00
|
|
|
self.load_example_data()
|
|
|
|
workflow = self.create_workflow('random_fact')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
|
|
|
processor.do_engine_steps()
|
2020-04-15 11:13:32 -04:00
|
|
|
|
2020-04-17 13:30:32 -04:00
|
|
|
task = processor.next_task()
|
|
|
|
task.task_spec.documentation = "Some simple docs"
|
|
|
|
docs = WorkflowService._process_documentation(task)
|
|
|
|
self.assertEqual("Some simple docs", docs)
|
2020-04-15 11:13:32 -04:00
|
|
|
|
|
|
|
task.data = {"replace_me": "new_thing"}
|
2020-04-17 13:30:32 -04:00
|
|
|
task.task_spec.documentation = "{{replace_me}}"
|
|
|
|
docs = WorkflowService._process_documentation(task)
|
|
|
|
self.assertEqual("new_thing", docs)
|
2020-04-15 11:13:32 -04:00
|
|
|
|
|
|
|
documentation = """
|
|
|
|
# Bigger Test
|
|
|
|
|
|
|
|
* bullet one
|
|
|
|
* bullet two has {{replace_me}}
|
|
|
|
|
2020-05-23 23:53:48 -06:00
|
|
|
# other stuff.
|
2020-04-15 11:13:32 -04:00
|
|
|
"""
|
|
|
|
expected = """
|
|
|
|
# Bigger Test
|
|
|
|
|
|
|
|
* bullet one
|
|
|
|
* bullet two has new_thing
|
|
|
|
|
2020-05-23 23:53:48 -06:00
|
|
|
# other stuff.
|
2020-04-15 11:13:32 -04:00
|
|
|
"""
|
2020-04-17 13:30:32 -04:00
|
|
|
task.task_spec.documentation = documentation
|
|
|
|
result = WorkflowService._process_documentation(task)
|
|
|
|
self.assertEqual(expected, result)
|
2020-04-15 11:13:32 -04:00
|
|
|
|
|
|
|
def test_documentation_processing_handles_conditionals(self):
|
|
|
|
|
2020-04-17 13:30:32 -04:00
|
|
|
self.load_example_data()
|
|
|
|
workflow = self.create_workflow('random_fact')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
|
|
|
processor.do_engine_steps()
|
|
|
|
|
|
|
|
task = processor.next_task()
|
|
|
|
task.task_spec.documentation = "This test {% if works == 'yes' %}works{% endif %}"
|
|
|
|
docs = WorkflowService._process_documentation(task)
|
|
|
|
self.assertEqual("This test ", docs)
|
2020-04-15 11:13:32 -04:00
|
|
|
|
|
|
|
task.data = {"works": 'yes'}
|
2020-04-17 13:30:32 -04:00
|
|
|
docs = WorkflowService._process_documentation(task)
|
|
|
|
self.assertEqual("This test works", docs)
|
2020-04-15 11:13:32 -04:00
|
|
|
|
|
|
|
def test_enum_options_from_file(self):
|
|
|
|
self.load_example_data()
|
|
|
|
workflow = self.create_workflow('enum_options_from_file')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
|
|
|
processor.do_engine_steps()
|
|
|
|
task = processor.next_task()
|
2020-04-22 19:40:40 -04:00
|
|
|
WorkflowService.process_options(task, task.task_spec.form.fields[0])
|
2020-04-15 11:13:32 -04:00
|
|
|
options = task.task_spec.form.fields[0].options
|
2020-08-13 20:27:51 -04:00
|
|
|
self.assertEqual(29, len(options))
|
|
|
|
self.assertEqual('0', options[0]['id'])
|
|
|
|
self.assertEqual("Other", options[0]['name'])
|
2020-04-22 15:37:02 -04:00
|
|
|
|
2020-05-25 15:30:06 -04:00
|
|
|
def test_random_data_populate_form_on_auto_complete(self):
|
|
|
|
self.load_example_data()
|
|
|
|
workflow = self.create_workflow('enum_options_with_search')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
|
|
|
processor.do_engine_steps()
|
|
|
|
task = processor.next_task()
|
|
|
|
task_api = WorkflowService.spiff_task_to_api_task(task, add_docs_and_forms=True)
|
2020-05-30 17:21:57 -04:00
|
|
|
WorkflowService.populate_form_with_random_data(task, task_api, required_only=False)
|
2020-06-17 17:11:15 -04:00
|
|
|
self.assertTrue(isinstance(task.data["sponsor"], dict))
|
|
|
|
|
2020-07-15 11:16:35 -04:00
|
|
|
def test_dmn_evaluation_errors_in_oncomplete_raise_api_errors_during_validation(self):
|
|
|
|
workflow_spec_model = self.load_test_spec("decision_table_invalid")
|
2020-07-20 11:39:50 -04:00
|
|
|
with self.assertRaises(ApiError):
|
2020-07-19 16:40:33 -06:00
|
|
|
WorkflowService.test_spec(workflow_spec_model.id)
|