mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-23 05:08:32 +00:00
PB_ENABLED can be set to false in the configuration (either in a file called instance/config.py, or as an environment variable) Added a check in the base_test, to assure that we are always running tests with the test configuration, and bail out otherwise. Setting TESTING=true as an environment variable will get this, but so well the correct ordering of imports. Just be dead certain the first file every test file imports is base_test.py. Aaron was right, and we call the Protocol Builder in all kinds of awful places. But we don't do this now. So Carlos, you should have the ability to reuse a lot of the logic in the study_service now. I dropped the poorly named "study-update" endpoint completely. We weren't using it. POST and PUT to Study still work just fine for doing exactly that. All the tests now run and pass with the Protocol builder disabled. Tests that specifically check PB behavior turn it back on for the test, or mock it out.
450 lines
20 KiB
Python
450 lines
20 KiB
Python
import json
|
|
import os
|
|
import random
|
|
from unittest.mock import patch
|
|
|
|
from crc import session, app
|
|
from crc.models.api_models import WorkflowApiSchema, MultiInstanceType, TaskSchema
|
|
from crc.models.file import FileModelSchema
|
|
from crc.models.stats import TaskEventModel
|
|
from crc.models.workflow import WorkflowStatus
|
|
from crc.services.protocol_builder import ProtocolBuilderService
|
|
from crc.services.workflow_service import WorkflowService
|
|
from tests.base_test import BaseTest
|
|
|
|
|
|
class TestTasksApi(BaseTest):
|
|
|
|
def get_workflow_api(self, workflow, soft_reset=False, hard_reset=False):
|
|
rv = self.app.get('/v1.0/workflow/%i?soft_reset=%s&hard_reset=%s' %
|
|
(workflow.id, str(soft_reset), str(hard_reset)),
|
|
headers=self.logged_in_headers(),
|
|
content_type="application/json")
|
|
self.assert_success(rv)
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
workflow_api = WorkflowApiSchema().load(json_data)
|
|
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
|
|
return workflow_api
|
|
|
|
def complete_form(self, workflow_in, task_in, dict_data, error_code = None):
|
|
prev_completed_task_count = workflow_in.completed_tasks
|
|
if isinstance(task_in, dict):
|
|
task_id = task_in["id"]
|
|
else:
|
|
task_id = task_in.id
|
|
rv = self.app.put('/v1.0/workflow/%i/task/%s/data' % (workflow_in.id, task_id),
|
|
headers=self.logged_in_headers(),
|
|
content_type="application/json",
|
|
data=json.dumps(dict_data))
|
|
if error_code:
|
|
self.assert_failure(rv, error_code=error_code)
|
|
return
|
|
|
|
self.assert_success(rv)
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
# Assure stats are updated on the model
|
|
workflow = WorkflowApiSchema().load(json_data)
|
|
# The total number of tasks may change over time, as users move through gateways
|
|
# branches may be pruned. As we hit parallel Multi-Instance new tasks may be created...
|
|
self.assertIsNotNone(workflow.total_tasks)
|
|
self.assertEquals(prev_completed_task_count + 1, workflow.completed_tasks)
|
|
# Assure a record exists in the Task Events
|
|
task_events = session.query(TaskEventModel) \
|
|
.filter_by(workflow_id=workflow.id) \
|
|
.filter_by(task_id=task_id) \
|
|
.order_by(TaskEventModel.date.desc()).all()
|
|
self.assertGreater(len(task_events), 0)
|
|
event = task_events[0]
|
|
self.assertIsNotNone(event.study_id)
|
|
self.assertEquals("dhf8r", event.user_uid)
|
|
self.assertEquals(workflow.id, event.workflow_id)
|
|
self.assertEquals(workflow.workflow_spec_id, event.workflow_spec_id)
|
|
self.assertEquals(workflow.spec_version, event.spec_version)
|
|
self.assertEquals(WorkflowService.TASK_ACTION_COMPLETE, event.action)
|
|
self.assertEquals(task_in.id, task_id)
|
|
self.assertEquals(task_in.name, event.task_name)
|
|
self.assertEquals(task_in.title, event.task_title)
|
|
self.assertEquals(task_in.type, event.task_type)
|
|
self.assertEquals("COMPLETED", event.task_state)
|
|
# Not sure what vodoo is happening inside of marshmallow to get me in this state.
|
|
if isinstance(task_in.multi_instance_type, MultiInstanceType):
|
|
self.assertEquals(task_in.multi_instance_type.value, event.mi_type)
|
|
else:
|
|
self.assertEquals(task_in.multi_instance_type, event.mi_type)
|
|
|
|
self.assertEquals(task_in.multi_instance_count, event.mi_count)
|
|
self.assertEquals(task_in.multi_instance_index, event.mi_index)
|
|
self.assertEquals(task_in.process_name, event.process_name)
|
|
self.assertIsNotNone(event.date)
|
|
|
|
|
|
workflow = WorkflowApiSchema().load(json_data)
|
|
return workflow
|
|
|
|
|
|
def test_get_current_user_tasks(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('random_fact')
|
|
workflow = self.get_workflow_api(workflow)
|
|
task = workflow.next_task
|
|
self.assertEqual("Task_User_Select_Type", task.name)
|
|
self.assertEqual(3, len(task.form["fields"][0]["options"]))
|
|
self.assertIsNotNone(task.documentation)
|
|
expected_docs = """# h1 Heading 8-)
|
|
## h2 Heading
|
|
### h3 Heading
|
|
#### h4 Heading
|
|
##### h5 Heading
|
|
###### h6 Heading
|
|
"""
|
|
self.assertTrue(str.startswith(task.documentation, expected_docs))
|
|
|
|
def test_two_forms_task(self):
|
|
# Set up a new workflow
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('two_forms')
|
|
# get the first form in the two form workflow.
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertEqual('two_forms', workflow_api.workflow_spec_id)
|
|
self.assertEqual(2, len(workflow_api.navigation))
|
|
self.assertIsNotNone(workflow_api.next_task.form)
|
|
self.assertEqual("UserTask", workflow_api.next_task.type)
|
|
self.assertEqual("StepOne", workflow_api.next_task.name)
|
|
self.assertEqual(1, len(workflow_api.next_task.form['fields']))
|
|
|
|
# Complete the form for Step one and post it.
|
|
self.complete_form(workflow, workflow_api.next_task, {"color": "blue"})
|
|
|
|
# Get the next Task
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertEqual("StepTwo", workflow_api.next_task.name)
|
|
|
|
# Get all user Tasks and check that the data have been saved
|
|
task = workflow_api.next_task
|
|
self.assertIsNotNone(task.data)
|
|
for val in task.data.values():
|
|
self.assertIsNotNone(val)
|
|
|
|
def test_error_message_on_bad_gateway_expression(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('exclusive_gateway')
|
|
|
|
# get the first form in the two form workflow.
|
|
task = self.get_workflow_api(workflow).next_task
|
|
self.complete_form(workflow, task, {"has_bananas": True})
|
|
|
|
def test_workflow_with_parallel_forms(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('exclusive_gateway')
|
|
|
|
# get the first form in the two form workflow.
|
|
task = self.get_workflow_api(workflow).next_task
|
|
self.complete_form(workflow, task, {"has_bananas": True})
|
|
|
|
# Get the next Task
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertEqual("Task_Num_Bananas", workflow_api.next_task.name)
|
|
|
|
def test_navigation_with_parallel_forms(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('exclusive_gateway')
|
|
|
|
# get the first form in the two form workflow.
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
|
|
self.assertIsNotNone(workflow_api.navigation)
|
|
nav = workflow_api.navigation
|
|
self.assertEquals(5, len(nav))
|
|
self.assertEquals("Do You Have Bananas", nav[0]['title'])
|
|
self.assertEquals("Bananas?", nav[1]['title'])
|
|
self.assertEquals("FUTURE", nav[1]['state'])
|
|
self.assertEquals("yes", nav[2]['title'])
|
|
self.assertEquals("NOOP", nav[2]['state'])
|
|
self.assertEquals("no", nav[3]['title'])
|
|
self.assertEquals("NOOP", nav[3]['state'])
|
|
|
|
def test_navigation_with_exclusive_gateway(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('exclusive_gateway_2')
|
|
|
|
# get the first form in the two form workflow.
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertIsNotNone(workflow_api.navigation)
|
|
nav = workflow_api.navigation
|
|
self.assertEquals(7, len(nav))
|
|
self.assertEquals("Task 1", nav[0]['title'])
|
|
self.assertEquals("Which Branch?", nav[1]['title'])
|
|
self.assertEquals("a", nav[2]['title'])
|
|
self.assertEquals("Task 2a", nav[3]['title'])
|
|
self.assertEquals("b", nav[4]['title'])
|
|
self.assertEquals("Task 2b", nav[5]['title'])
|
|
self.assertEquals("Task 3", nav[6]['title'])
|
|
|
|
|
|
def test_document_added_to_workflow_shows_up_in_file_list(self):
|
|
self.load_example_data()
|
|
self.create_reference_document()
|
|
workflow = self.create_workflow('docx')
|
|
# get the first form in the two form workflow.
|
|
task = self.get_workflow_api(workflow).next_task
|
|
data = {
|
|
"full_name": "Buck of the Wild",
|
|
"date": "5/1/2020",
|
|
"title": "Leader of the Pack",
|
|
"company": "In the company of wolves",
|
|
"last_name": "Mr. Wolf"
|
|
}
|
|
workflow_api = self.complete_form(workflow, task, data)
|
|
self.assertIsNotNone(workflow_api.next_task)
|
|
self.assertEqual("EndEvent_0evb22x", workflow_api.next_task.name)
|
|
self.assertTrue(workflow_api.status == WorkflowStatus.complete)
|
|
rv = self.app.get('/v1.0/file?workflow_id=%i' % workflow.id, headers=self.logged_in_headers())
|
|
self.assert_success(rv)
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
files = FileModelSchema(many=True).load(json_data, session=session)
|
|
self.assertTrue(len(files) == 1)
|
|
# Assure we can still delete the study even when there is a file attached to a workflow.
|
|
rv = self.app.delete('/v1.0/study/%i' % workflow.study_id, headers=self.logged_in_headers())
|
|
self.assert_success(rv)
|
|
|
|
|
|
|
|
def test_get_documentation_populated_in_end(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('random_fact')
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
task = workflow_api.next_task
|
|
self.assertEqual("Task_User_Select_Type", task.name)
|
|
self.assertEqual(3, len(task.form["fields"][0]["options"]))
|
|
self.assertIsNotNone(task.documentation)
|
|
self.complete_form(workflow, workflow_api.next_task, {"type": "norris"})
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertEqual("EndEvent_0u1cgrf", workflow_api.next_task.name)
|
|
self.assertIsNotNone(workflow_api.next_task.documentation)
|
|
self.assertTrue("norris" in workflow_api.next_task.documentation)
|
|
|
|
def test_load_workflow_from_outdated_spec(self):
|
|
|
|
# Start the basic two_forms workflow and complete a task.
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('two_forms')
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.complete_form(workflow, workflow_api.next_task, {"color": "blue"})
|
|
self.assertTrue(workflow_api.is_latest_spec)
|
|
|
|
# Modify the specification, with a major change that alters the flow and can't be deserialized
|
|
# effectively, if it uses the latest spec files.
|
|
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
|
|
self.replace_file("two_forms.bpmn", file_path)
|
|
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertTrue(workflow_api.spec_version.startswith("v1 "))
|
|
self.assertFalse(workflow_api.is_latest_spec)
|
|
|
|
workflow_api = self.get_workflow_api(workflow, hard_reset=True)
|
|
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
|
|
self.assertTrue(workflow_api.is_latest_spec)
|
|
|
|
# Assure this hard_reset sticks (added this after a bug was found)
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
|
|
self.assertTrue(workflow_api.is_latest_spec)
|
|
|
|
def test_soft_reset_errors_out_and_next_result_is_on_original_version(self):
|
|
|
|
# Start the basic two_forms workflow and complete a task.
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('two_forms')
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.complete_form(workflow, workflow_api.next_task, {"color": "blue"})
|
|
self.assertTrue(workflow_api.is_latest_spec)
|
|
|
|
# Modify the specification, with a major change that alters the flow and can't be deserialized
|
|
# effectively, if it uses the latest spec files.
|
|
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'mods', 'two_forms_struc_mod.bpmn')
|
|
self.replace_file("two_forms.bpmn", file_path)
|
|
|
|
# perform a soft reset returns an error
|
|
rv = self.app.get('/v1.0/workflow/%i?soft_reset=%s&hard_reset=%s' %
|
|
(workflow.id, "true", "false"),
|
|
content_type="application/json",
|
|
headers=self.logged_in_headers())
|
|
self.assert_failure(rv, error_code="unexpected_workflow_structure")
|
|
|
|
# Try again without a soft reset, and we are still ok, and on the original version.
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertTrue(workflow_api.spec_version.startswith("v1 "))
|
|
self.assertFalse(workflow_api.is_latest_spec)
|
|
|
|
|
|
def test_manual_task_with_external_documentation(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('manual_task_with_external_documentation')
|
|
|
|
# get the first form in the two form workflow.
|
|
task = self.get_workflow_api(workflow).next_task
|
|
workflow_api = self.complete_form(workflow, task, {"name": "Dan"})
|
|
|
|
workflow = self.get_workflow_api(workflow)
|
|
self.assertEquals('Task_Manual_One', workflow.next_task.name)
|
|
self.assertEquals('ManualTask', workflow_api.next_task.type)
|
|
self.assertTrue('Markdown' in workflow_api.next_task.documentation)
|
|
self.assertTrue('Dan' in workflow_api.next_task.documentation)
|
|
|
|
def test_bpmn_extension_properties_are_populated(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('manual_task_with_external_documentation')
|
|
|
|
# get the first form in the two form workflow.
|
|
task = self.get_workflow_api(workflow).next_task
|
|
self.assertEquals("JustAValue", task.properties['JustAKey'])
|
|
|
|
|
|
@patch('crc.services.protocol_builder.requests.get')
|
|
def test_multi_instance_task(self, mock_get):
|
|
# Enable the protocol builder.
|
|
ProtocolBuilderService.ENABLED = True
|
|
|
|
# This depends on getting a list of investigators back from the protocol builder.
|
|
mock_get.return_value.ok = True
|
|
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
|
|
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('multi_instance')
|
|
|
|
# get the first form in the two form workflow.
|
|
workflow = self.get_workflow_api(workflow)
|
|
navigation = self.get_workflow_api(workflow).navigation
|
|
self.assertEquals(4, len(navigation)) # Start task, form_task, multi_task, end task
|
|
self.assertEquals("UserTask", workflow.next_task.type)
|
|
self.assertEquals(MultiInstanceType.sequential.value, workflow.next_task.multi_instance_type)
|
|
self.assertEquals(9, workflow.next_task.multi_instance_count)
|
|
|
|
# Assure that the names for each task are properly updated, so they aren't all the same.
|
|
self.assertEquals("Primary Investigator", workflow.next_task.properties['display_name'])
|
|
|
|
|
|
def test_lookup_endpoint_for_task_field_enumerations(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('enum_options_with_search')
|
|
# get the first form in the two form workflow.
|
|
workflow = self.get_workflow_api(workflow)
|
|
task = workflow.next_task
|
|
field_id = task.form['fields'][0]['id']
|
|
rv = self.app.get('/v1.0/workflow/%i/task/%s/lookup/%s?query=%s&limit=5' %
|
|
(workflow.id, task.id, field_id, 'c'), # All records with a word that starts with 'c'
|
|
headers=self.logged_in_headers(),
|
|
content_type="application/json")
|
|
self.assert_success(rv)
|
|
results = json.loads(rv.get_data(as_text=True))
|
|
self.assertEqual(5, len(results))
|
|
|
|
def test_lookup_endpoint_for_task_ldap_field_lookup(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('ldap_lookup')
|
|
# get the first form
|
|
workflow = self.get_workflow_api(workflow)
|
|
task = workflow.next_task
|
|
field_id = task.form['fields'][0]['id']
|
|
# lb3dp is a user record in the mock ldap responses for tests.
|
|
rv = self.app.get('/v1.0/workflow/%i/task/%s/lookup/%s?query=%s&limit=5' %
|
|
(workflow.id, task.id, field_id, 'lb3dp'),
|
|
headers=self.logged_in_headers(),
|
|
content_type="application/json")
|
|
self.assert_success(rv)
|
|
results = json.loads(rv.get_data(as_text=True))
|
|
self.assertEqual(1, len(results))
|
|
|
|
def test_sub_process(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('subprocess')
|
|
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
navigation = workflow_api.navigation
|
|
task = workflow_api.next_task
|
|
|
|
self.assertEquals(2, len(navigation))
|
|
self.assertEquals("UserTask", task.type)
|
|
self.assertEquals("Activity_A", task.name)
|
|
self.assertEquals("My Sub Process", task.process_name)
|
|
workflow_api = self.complete_form(workflow, task, {"name": "Dan"})
|
|
task = workflow_api.next_task
|
|
self.assertIsNotNone(task)
|
|
|
|
self.assertEquals("Activity_B", task.name)
|
|
self.assertEquals("Sub Workflow Example", task.process_name)
|
|
workflow_api = self.complete_form(workflow, task, {"name": "Dan"})
|
|
self.assertEquals(WorkflowStatus.complete, workflow_api.status)
|
|
|
|
def test_update_task_resets_token(self):
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('exclusive_gateway')
|
|
|
|
# Start the workflow.
|
|
first_task = self.get_workflow_api(workflow).next_task
|
|
self.complete_form(workflow, first_task, {"has_bananas": True})
|
|
workflow = self.get_workflow_api(workflow)
|
|
self.assertEquals('Task_Num_Bananas', workflow.next_task.name)
|
|
|
|
# Trying to re-submit the initial task, and answer differently, should result in an error.
|
|
self.complete_form(workflow, first_task, {"has_bananas": False}, error_code="invalid_state")
|
|
|
|
# Go ahead and set the number of bananas.
|
|
workflow = self.get_workflow_api(workflow)
|
|
task = workflow.next_task
|
|
|
|
self.complete_form(workflow, task, {"num_bananas": 4})
|
|
# We are now at the end of the workflow.
|
|
|
|
# Make the old task the current task.
|
|
rv = self.app.put('/v1.0/workflow/%i/task/%s/set_token' % (workflow.id, first_task.id),
|
|
headers=self.logged_in_headers(),
|
|
content_type="application/json")
|
|
self.assert_success(rv)
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
workflow = WorkflowApiSchema().load(json_data)
|
|
|
|
# Assure the Next Task is the one we just reset the token to be on.
|
|
self.assertEquals("Task_Has_Bananas", workflow.next_task.name)
|
|
|
|
# Go ahead and get that workflow one more time, it should still be right.
|
|
workflow = self.get_workflow_api(workflow)
|
|
|
|
# Assure the Next Task is the one we just reset the token to be on.
|
|
self.assertEquals("Task_Has_Bananas", workflow.next_task.name)
|
|
|
|
# The next task should be a different value.
|
|
self.complete_form(workflow, workflow.next_task, {"has_bananas": False})
|
|
workflow = self.get_workflow_api(workflow)
|
|
self.assertEquals('Task_Why_No_Bananas', workflow.next_task.name)
|
|
|
|
@patch('crc.services.protocol_builder.requests.get')
|
|
def test_parallel_multi_instance(self, mock_get):
|
|
|
|
# Assure we get nine investigators back from the API Call, as set in the investigators.json file.
|
|
mock_get.return_value.ok = True
|
|
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
|
|
|
|
|
|
self.load_example_data()
|
|
workflow = self.create_workflow('multi_instance_parallel')
|
|
|
|
workflow_api = self.get_workflow_api(workflow)
|
|
self.assertEquals(12, len(workflow_api.navigation))
|
|
ready_items = [nav for nav in workflow_api.navigation if nav['state'] == "READY"]
|
|
self.assertEquals(9, len(ready_items))
|
|
|
|
self.assertEquals("UserTask", workflow_api.next_task.type)
|
|
self.assertEquals("MutiInstanceTask",workflow_api.next_task.name)
|
|
self.assertEquals("more information", workflow_api.next_task.title)
|
|
|
|
for i in random.sample(range(9), 9):
|
|
task = TaskSchema().load(ready_items[i]['task'])
|
|
self.complete_form(workflow, task, {"investigator":{"email": "dhf8r@virginia.edu"}})
|
|
#tasks = self.get_workflow_api(workflow).user_tasks
|
|
|
|
workflow = self.get_workflow_api(workflow)
|
|
self.assertEquals(WorkflowStatus.complete, workflow.status)
|
|
|