Adding an argument to update task that will allow it to complete all remaining tasks through an "update_all" flag,

this will allow us to remove some fairly complex logic from the front end that isn't behaving properly.
This commit is contained in:
Dan 2021-03-12 20:41:07 -05:00
parent 1000b22a3b
commit 15c64c862e
8 changed files with 181 additions and 112 deletions

View File

@ -904,6 +904,12 @@ paths:
description: Terminate the loop on a looping task
schema:
type: boolean
- name: update_all
in: query
required: false
description: In the case of a multi-instance user task, update all tasks with the submitted values.
schema:
type: boolean
put:
operationId: crc.api.workflow.update_task
summary: Exclusively for User Tasks, submits form data as a flat set of key/values.

View File

@ -179,7 +179,7 @@ def set_current_task(workflow_id, task_id):
return WorkflowApiSchema().dump(workflow_api_model)
def update_task(workflow_id, task_id, body, terminate_loop=None):
def update_task(workflow_id, task_id, body, terminate_loop=None, update_all=False):
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
if workflow_model is None:
raise ApiError("invalid_workflow_id", "The given workflow id is not valid.", status_code=404)
@ -191,6 +191,8 @@ def update_task(workflow_id, task_id, body, terminate_loop=None):
task_id = uuid.UUID(task_id)
spiff_task = processor.bpmn_workflow.get_task(task_id)
_verify_user_and_role(processor, spiff_task)
user = UserService.current_user(allow_admin_impersonate=False) # Always log as the real user.
if not spiff_task:
raise ApiError("empty_task", "Processor failed to obtain task.", status_code=404)
if spiff_task.state != spiff_task.READY:
@ -199,20 +201,35 @@ def update_task(workflow_id, task_id, body, terminate_loop=None):
if terminate_loop:
spiff_task.terminate_loop()
spiff_task.update_data(body)
processor.complete_task(spiff_task)
processor.do_engine_steps()
processor.save()
# Log the action, and any pending task assignments in the event of lanes in the workflow.
user = UserService.current_user(allow_admin_impersonate=False) # Always log as the real user.
WorkflowService.log_task_action(user.uid, processor, spiff_task, WorkflowService.TASK_ACTION_COMPLETE)
# Update the task
__update_task(processor, spiff_task, body, user)
# If we need to update all tasks, then get the next ready task and if it a multi-instance with the same
# task spec, complete that form as well.
if update_all:
next_task = processor.next_task()
form_data = WorkflowService().extract_form_data(body, spiff_task)
while next_task and next_task.task_info()["mi_index"] > 0:
__update_task(processor, next_task, form_data, user)
next_task = processor.next_task()
WorkflowService.update_task_assignments(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)
def __update_task(processor, task, data, user):
"""All the things that need to happen when we complete a form, abstracted
here because we need to do it multiple times when completing all tasks in
a multi-instance task"""
task.update_data(data)
processor.complete_task(task)
processor.do_engine_steps()
processor.save()
WorkflowService.log_task_action(user.uid, processor, task, WorkflowService.TASK_ACTION_COMPLETE)
def list_workflow_spec_categories():
schema = WorkflowSpecCategoryModelSchema(many=True)
return schema.dump(session.query(WorkflowSpecCategoryModel).all())

View File

@ -724,7 +724,12 @@ class WorkflowService(object):
elif isinstance(task.task_spec, MultiInstanceTask):
group = task.task_spec.elementVar
if group in latest_data:
data[group] = latest_data[group]
if isinstance(group, int):
data[group] = latest_data[group]
elif field.id in latest_data[group]:
if group not in data:
data[group] = {}
data[group][field.id] = latest_data[group][field.id]
else:
if field.id in latest_data:
data[field.id] = latest_data[field.id]

View File

@ -5,7 +5,7 @@ services:
volumes:
- $HOME/docker/volumes/postgres:/var/lib/postgresql/data
ports:
- 5003:5432
- 5432:5432
environment:
- POSTGRES_USER=${DB_USER}
- POSTGRES_PASSWORD=${DB_PASS}

View File

@ -68,24 +68,24 @@ class BaseTest(unittest.TestCase):
studies = [
{
'id':0,
'title':'The impact of fried pickles on beer consumption in bipedal software developers.',
'last_updated':datetime.datetime.now(),
'status':StudyStatus.in_progress,
'primary_investigator_id':'dhf8r',
'sponsor':'Sartography Pharmaceuticals',
'ind_number':'1234',
'user_uid':'dhf8r'
'id': 0,
'title': 'The impact of fried pickles on beer consumption in bipedal software developers.',
'last_updated': datetime.datetime.now(),
'status': StudyStatus.in_progress,
'primary_investigator_id': 'dhf8r',
'sponsor': 'Sartography Pharmaceuticals',
'ind_number': '1234',
'user_uid': 'dhf8r'
},
{
'id':1,
'title':'Requirement of hippocampal neurogenesis for the behavioral effects of soft pretzels',
'last_updated':datetime.datetime.now(),
'status':StudyStatus.in_progress,
'primary_investigator_id':'dhf8r',
'sponsor':'Makerspace & Co.',
'ind_number':'5678',
'user_uid':'dhf8r'
'id': 1,
'title': 'Requirement of hippocampal neurogenesis for the behavioral effects of soft pretzels',
'last_updated': datetime.datetime.now(),
'status': StudyStatus.in_progress,
'primary_investigator_id': 'dhf8r',
'sponsor': 'Makerspace & Co.',
'ind_number': '5678',
'user_uid': 'dhf8r'
}
]
@ -141,7 +141,6 @@ class BaseTest(unittest.TestCase):
session.execute("delete from workflow; delete from file_data; delete from file; delete from workflow_spec;")
session.commit()
def load_example_data(self, use_crc_data=False, use_rrt_data=False):
"""use_crc_data will cause this to load the mammoth collection of documents
we built up developing crc, use_rrt_data will do the same for hte rrt project,
@ -219,7 +218,6 @@ class BaseTest(unittest.TestCase):
data = myfile.read()
return data
def assert_success(self, rv, msg=""):
try:
data = json.loads(rv.get_data(as_text=True))
@ -361,7 +359,7 @@ class BaseTest(unittest.TestCase):
def get_workflow_api(self, workflow, do_engine_steps=True, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
url = (f'/v1.0/workflow/{workflow.id}'
url = (f'/v1.0/workflow/{workflow.id}'
f'?do_engine_steps={str(do_engine_steps)}')
workflow_api = self.get_workflow_common(url, user)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
@ -370,13 +368,14 @@ class BaseTest(unittest.TestCase):
def restart_workflow_api(self, workflow, clear_data=False, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
url = (f'/v1.0/workflow/{workflow.id}/restart'
url = (f'/v1.0/workflow/{workflow.id}/restart'
f'?clear_data={str(clear_data)}')
workflow_api = self.get_workflow_common(url, user)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
return workflow_api
def complete_form(self, workflow_in, task_in, dict_data, error_code=None, terminate_loop=None, user_uid="dhf8r"):
def complete_form(self, workflow_in, task_in, dict_data, update_all=False, error_code=None, terminate_loop=None,
user_uid="dhf8r"):
prev_completed_task_count = workflow_in.completed_tasks
if isinstance(task_in, dict):
task_id = task_in["id"]
@ -385,16 +384,16 @@ class BaseTest(unittest.TestCase):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
args = ""
if terminate_loop:
rv = self.app.put('/v1.0/workflow/%i/task/%s/data?terminate_loop=true' % (workflow_in.id, task_id),
headers=self.logged_in_headers(user=user),
content_type="application/json",
data=json.dumps(dict_data))
else:
rv = self.app.put('/v1.0/workflow/%i/task/%s/data' % (workflow_in.id, task_id),
headers=self.logged_in_headers(user=user),
content_type="application/json",
data=json.dumps(dict_data))
args += "?terminate_loop=true"
if update_all:
args += "?update_all=true"
rv = self.app.put('/v1.0/workflow/%i/task/%s/data%s' % (workflow_in.id, task_id, args),
headers=self.logged_in_headers(user=user),
content_type="application/json",
data=json.dumps(dict_data))
if error_code:
self.assert_failure(rv, error_code=error_code)
return
@ -408,7 +407,7 @@ class BaseTest(unittest.TestCase):
# branches may be pruned. As we hit parallel Multi-Instance new tasks may be created...
self.assertIsNotNone(workflow.total_tasks)
# presumably, we also need to deal with sequential items here too . .
if not task_in.multi_instance_type == 'looping':
if not task_in.multi_instance_type == 'looping' and not update_all:
self.assertEqual(prev_completed_task_count + 1, workflow.completed_tasks)
# Assure a record exists in the Task Events

View File

@ -1,13 +1,7 @@
from unittest.mock import patch
from crc import session
from crc.models.api_models import MultiInstanceType
from crc.models.study import StudyModel
from crc.models.workflow import WorkflowStatus
from tests.base_test import BaseTest
from crc.services.study_service import StudyService
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
from tests.base_test import BaseTest
class TestWorkflowProcessorLoopingTask(BaseTest):

View File

@ -0,0 +1,111 @@
import json
import random
from unittest.mock import patch
from tests.base_test import BaseTest
from crc import session, app
from crc.models.api_models import WorkflowApiSchema, MultiInstanceType
from crc.models.workflow import WorkflowStatus
from example_data import ExampleDataLoader
class TestMultiinstanceTasksApi(BaseTest):
@patch('crc.services.protocol_builder.requests.get')
def test_multi_instance_task(self, mock_get):
ExampleDataLoader().load_reference_documents()
# Enable the protocol builder.
app.config['PB_ENABLED'] = True
# This depends on getting a list of investigators back from the protocol builder.
mock_get.return_value.ok = True
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
workflow = self.create_workflow('multi_instance')
# get the first form in the two form workflow.
workflow = self.get_workflow_api(workflow)
navigation = self.get_workflow_api(workflow).navigation
self.assertEqual(5, len(navigation)) # Start task, form_task, multi_task, end task
self.assertEqual("UserTask", workflow.next_task.type)
self.assertEqual(MultiInstanceType.sequential.value, workflow.next_task.multi_instance_type)
self.assertEqual(5, workflow.next_task.multi_instance_count)
# Assure that the names for each task are properly updated, so they aren't all the same.
self.assertEqual("Primary Investigator", workflow.next_task.title)
@patch('crc.services.protocol_builder.requests.get')
def test_parallel_multi_instance(self, mock_get):
# Assure we get nine investigators back from the API Call, as set in the investigators.json file.
app.config['PB_ENABLED'] = True
mock_get.return_value.ok = True
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
ExampleDataLoader().load_reference_documents()
workflow = self.create_workflow('multi_instance_parallel')
workflow_api = self.get_workflow_api(workflow)
self.assertEqual(9, len(workflow_api.navigation))
ready_items = [nav for nav in workflow_api.navigation if nav.state == "READY"]
self.assertEqual(5, len(ready_items))
self.assertEqual("UserTask", workflow_api.next_task.type)
self.assertEqual("MultiInstanceTask",workflow_api.next_task.name)
self.assertEqual("Primary Investigator", workflow_api.next_task.title)
for i in random.sample(range(5), 5):
task_id = ready_items[i].task_id
rv = self.app.put('/v1.0/workflow/%i/task/%s/set_token' % (workflow.id, task_id),
headers=self.logged_in_headers(),
content_type="application/json")
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
workflow = WorkflowApiSchema().load(json_data)
data = workflow.next_task.data
data['investigator']['email'] = "dhf8r@virginia.edu"
self.complete_form(workflow, workflow.next_task, data)
#tasks = self.get_workflow_api(workflow).user_tasks
workflow = self.get_workflow_api(workflow)
self.assertEqual(WorkflowStatus.complete, workflow.status)
@patch('crc.services.protocol_builder.requests.get')
def test_parallel_multi_instance_update_all(self, mock_get):
# Assure we get nine investigators back from the API Call, as set in the investigators.json file.
app.config['PB_ENABLED'] = True
mock_get.return_value.ok = True
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
ExampleDataLoader().load_reference_documents()
workflow = self.create_workflow('multi_instance_parallel')
workflow_api = self.get_workflow_api(workflow)
self.assertEqual(9, len(workflow_api.navigation))
ready_items = [nav for nav in workflow_api.navigation if nav.state == "READY"]
self.assertEqual(5, len(ready_items))
self.assertEqual("UserTask", workflow_api.next_task.type)
self.assertEqual("MultiInstanceTask",workflow_api.next_task.name)
self.assertEqual("Primary Investigator", workflow_api.next_task.title)
data = workflow_api.next_task.data
data['investigator']['email'] = "dhf8r@virginia.edu"
self.complete_form(workflow, workflow_api.next_task, data, update_all=True)
workflow = self.get_workflow_api(workflow)
self.assertEqual(WorkflowStatus.complete, workflow.status)
data = workflow.next_task.data
for key in data["StudyInfo"]["investigators"]:
self.assertEquals("dhf8r@virginia.edu", data["StudyInfo"]["investigators"][key]['email'])

View File

@ -256,30 +256,6 @@ class TestTasksApi(BaseTest):
self.assertEqual("JustAValue", task.properties['JustAKey'])
@patch('crc.services.protocol_builder.requests.get')
def test_multi_instance_task(self, mock_get):
self.load_example_data()
# Enable the protocol builder.
app.config['PB_ENABLED'] = True
# This depends on getting a list of investigators back from the protocol builder.
mock_get.return_value.ok = True
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
workflow = self.create_workflow('multi_instance')
# get the first form in the two form workflow.
workflow = self.get_workflow_api(workflow)
navigation = self.get_workflow_api(workflow).navigation
self.assertEqual(5, len(navigation)) # Start task, form_task, multi_task, end task
self.assertEqual("UserTask", workflow.next_task.type)
self.assertEqual(MultiInstanceType.sequential.value, workflow.next_task.multi_instance_type)
self.assertEqual(5, workflow.next_task.multi_instance_count)
# Assure that the names for each task are properly updated, so they aren't all the same.
self.assertEqual("Primary Investigator", workflow.next_task.title)
def test_lookup_endpoint_for_task_field_enumerations(self):
workflow = self.create_workflow('enum_options_with_search')
@ -445,42 +421,3 @@ class TestTasksApi(BaseTest):
workflow = self.get_workflow_api(workflow)
self.assertEqual('Task_Why_No_Bananas', workflow.next_task.name)
@patch('crc.services.protocol_builder.requests.get')
def test_parallel_multi_instance(self, mock_get):
# Assure we get nine investigators back from the API Call, as set in the investigators.json file.
app.config['PB_ENABLED'] = True
mock_get.return_value.ok = True
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
self.load_example_data()
workflow = self.create_workflow('multi_instance_parallel')
workflow_api = self.get_workflow_api(workflow)
self.assertEqual(9, len(workflow_api.navigation))
ready_items = [nav for nav in workflow_api.navigation if nav.state == "READY"]
self.assertEqual(5, len(ready_items))
self.assertEqual("UserTask", workflow_api.next_task.type)
self.assertEqual("MultiInstanceTask",workflow_api.next_task.name)
self.assertEqual("Primary Investigator", workflow_api.next_task.title)
for i in random.sample(range(5), 5):
task_id = ready_items[i].task_id
rv = self.app.put('/v1.0/workflow/%i/task/%s/set_token' % (workflow.id, task_id),
headers=self.logged_in_headers(),
content_type="application/json")
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
workflow = WorkflowApiSchema().load(json_data)
data = workflow.next_task.data
data['investigator']['email'] = "dhf8r@virginia.edu"
self.complete_form(workflow, workflow.next_task, data)
#tasks = self.get_workflow_api(workflow).user_tasks
workflow = self.get_workflow_api(workflow)
self.assertEqual(WorkflowStatus.complete, workflow.status)