Merge pull request #550 from sartography/chore/unlock-admin-sandbox-739

Chore/unlock admin sandbox #739
This commit is contained in:
Dan Funk 2022-06-03 14:14:56 -04:00 committed by GitHub
commit 3cc5013484
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 55 additions and 9 deletions

View File

@ -6,7 +6,7 @@ from marshmallow import INCLUDE
from marshmallow_enum import EnumField
from crc import ma
from crc.models.workflow import WorkflowStatus
from crc.models.workflow import WorkflowStatus, WorkflowModel
from crc.models.file import FileSchema
class MultiInstanceType(enum.Enum):
@ -213,7 +213,7 @@ class DocumentDirectory(object):
class WorkflowApi(object):
def __init__(self, id, status, next_task, navigation,
workflow_spec_id, total_tasks, completed_tasks,
last_updated, is_review, title, study_id, state):
last_updated, is_review, title, study_id, state, is_admin_workflow=False):
self.id = id
self.status = status
self.next_task = next_task # The next task that requires user input.
@ -226,6 +226,7 @@ class WorkflowApi(object):
self.is_review = is_review
self.study_id = study_id or ''
self.state = state
self.is_admin_workflow = is_admin_workflow
class WorkflowApiSchema(ma.Schema):
@ -233,7 +234,7 @@ class WorkflowApiSchema(ma.Schema):
model = WorkflowApi
fields = ["id", "status", "next_task", "navigation",
"workflow_spec_id", "total_tasks", "completed_tasks",
"last_updated", "is_review", "title", "study_id", "state"]
"last_updated", "is_review", "title", "study_id", "state", "is_admin_workflow"]
unknown = INCLUDE
status = EnumField(WorkflowStatus)
@ -245,7 +246,7 @@ class WorkflowApiSchema(ma.Schema):
def make_workflow(self, data, **kwargs):
keys = ['id', 'status', 'next_task', 'navigation',
'workflow_spec_id', "total_tasks", "completed_tasks",
"last_updated", "is_review", "title", "study_id", "state"]
"last_updated", "is_review", "title", "study_id", "state", "is_admin_workflow"]
filtered_fields = {key: data[key] for key in keys}
filtered_fields['next_task'] = TaskSchema().make_task(data['next_task'])
return WorkflowApi(**filtered_fields)

View File

@ -664,6 +664,7 @@ class WorkflowService(object):
WorkflowService.update_navigation(navigation, processor)
spec_service = WorkflowSpecService()
spec = spec_service.get_spec(processor.workflow_spec_id)
is_admin_workflow = WorkflowService.is_admin_workflow(processor.workflow_spec_id)
workflow_api = WorkflowApi(
id=processor.get_workflow_id(),
status=processor.get_status(),
@ -676,7 +677,8 @@ class WorkflowService(object):
is_review=spec.is_review,
title=spec.display_name,
study_id=processor.workflow_model.study_id or None,
state=processor.workflow_model.state
state=processor.workflow_model.state,
is_admin_workflow=is_admin_workflow
)
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
# This may or may not work, sometimes there is no next task to complete.
@ -1151,3 +1153,15 @@ class WorkflowService(object):
session.add(workflow)
session.commit()
@staticmethod
def get_workflow_spec_category(workflow_spec_id):
workflow_spec = WorkflowSpecService().get_spec(workflow_spec_id)
category_id = workflow_spec.category_id
category = WorkflowSpecService().get_category(category_id)
return category
@staticmethod
def is_admin_workflow(workflow_spec_id):
category = WorkflowService.get_workflow_spec_category(workflow_spec_id)
return category.admin

View File

@ -169,12 +169,14 @@ class BaseTest(unittest.TestCase):
self.workflow_spec_service.add_category(category)
return category
def assure_category_exists(self, category_id=None):
def assure_category_exists(self, category_id=None, display_name="Test Workflows", admin=False):
category = None
if category_id is not None:
category = self.workflow_spec_service.get_category(category_id)
if category is None:
category = WorkflowSpecCategory(id="test_category", display_name="Test Workflows", admin=False, display_order=0)
if category_id is None:
category_id = 'test_category'
category = WorkflowSpecCategory(id=category_id, display_name=display_name, admin=admin, display_order=0)
self.workflow_spec_service.add_category(category)
return category

View File

@ -15,8 +15,6 @@ from crc.models.task_event import TaskEventModel
from crc.models.study import StudyEvent, StudyModel, StudySchema, StudyStatus, StudyEventType, StudyAssociated
from crc.models.workflow import WorkflowModel
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
from crc.services.workflow_spec_service import WorkflowSpecService
from crc.services.user_file_service import UserFileService

View File

@ -1,6 +1,7 @@
from tests.base_test import BaseTest
from crc import session
from crc.models.study import StudyStatus
from crc.models.task_event import TaskEventModel
from crc.models.user import UserModel
from crc.services.workflow_service import WorkflowService
@ -132,4 +133,34 @@ class TestWorkflowApi(BaseTest):
spec1 = self.workflow_spec_service.get_spec('hello_world')
self.assertNotIn('hello_world_lib', spec1.libraries)
def test_workflow_api_model(self):
"""Create 2 workflow specs, one of them in an admin sandbox
Make sure we pass the correct information through the api """
self.assure_category_exists('test_admin_category', admin=True)
self.load_test_spec('simple_form')
self.load_test_spec('hello_world', category_id='test_admin_category')
simple_form_workflow = self.create_workflow('simple_form')
hello_world_workflow = self.create_workflow('hello_world')
# Make sure both workflows use the same study
self.assertEqual(hello_world_workflow.study.id, simple_form_workflow.study.id)
study = hello_world_workflow.study
# Make sure the study status is in_progress
self.assertEqual(StudyStatus.in_progress, study.status)
rv_simple_form_1 = self.app.get('/v1.0/workflow/%s' % simple_form_workflow.id,
follow_redirects=True,
content_type="application/json",
headers=self.logged_in_headers())
self.assert_success(rv_simple_form_1)
json_data_simple_1 = json.loads(rv_simple_form_1.get_data(as_text=True))
self.assertFalse(json_data_simple_1['is_admin_workflow'])
rv_hello_world_1 = self.app.get('/v1.0/workflow/%s' % hello_world_workflow.id,
follow_redirects=True,
content_type="application/json",
headers=self.logged_in_headers())
self.assert_success(rv_hello_world_1)
json_data_hello_1 = json.loads(rv_hello_world_1.get_data(as_text=True))
self.assertTrue(json_data_hello_1['is_admin_workflow'])