From 0f1ccd7db5e1ee5daf301efad8782889eeb18f10 Mon Sep 17 00:00:00 2001 From: mike cullerton Date: Tue, 30 Nov 2021 08:54:15 -0500 Subject: [PATCH] New script to check workflow status. Includes test and test workflow --- crc/scripts/get_workflow_status.py | 39 +++++++ .../get_workflow_status.bpmn | 101 ++++++++++++++++++ tests/scripts/test_get_workflow_status.py | 35 ++++++ 3 files changed, 175 insertions(+) create mode 100644 crc/scripts/get_workflow_status.py create mode 100644 tests/data/get_workflow_status/get_workflow_status.bpmn create mode 100644 tests/scripts/test_get_workflow_status.py diff --git a/crc/scripts/get_workflow_status.py b/crc/scripts/get_workflow_status.py new file mode 100644 index 00000000..41e0bb43 --- /dev/null +++ b/crc/scripts/get_workflow_status.py @@ -0,0 +1,39 @@ +from crc import session +from crc.api.common import ApiError +from crc.models.workflow import WorkflowModel +from crc.scripts.script import Script + + +class MyScript(Script): + + def get_description(self): + return """ +Get the status of a workflow. +Currently, status is one of "not_started", "user_input_required", "waiting", or "complete". + +You must pass a workflow_id. + +Examples: + status = get_workflow_status(1) + status = get_workflow_status(search_workflow_id=1) +""" + + def do_task_validate_only(self, task, study_id, workflow_id, *args, **kwargs): + return self.do_task(task, study_id, workflow_id, *args, **kwargs) + + def do_task(self, task, study_id, workflow_id, *args, **kwargs): + if 'search_workflow_id' in kwargs.keys() or len(args) > 0: + if 'search_workflow_id' in kwargs.keys(): + search_workflow_id = kwargs['search_workflow_id'] + else: + search_workflow_id = args[0] + workflow_model = session.query(WorkflowModel).filter(WorkflowModel.id == search_workflow_id).first() + if workflow_model: + return workflow_model.status.value + else: + return f'No model found for workflow {search_workflow_id}.' + + else: + raise ApiError.from_task(code='missing_argument', + message='You must include a workflow_id when calling the `get_workflow_status` script.', + task=task) diff --git a/tests/data/get_workflow_status/get_workflow_status.bpmn b/tests/data/get_workflow_status/get_workflow_status.bpmn new file mode 100644 index 00000000..e50b0960 --- /dev/null +++ b/tests/data/get_workflow_status/get_workflow_status.bpmn @@ -0,0 +1,101 @@ + + + + + Flow_0wppzk2 + + + + Flow_0wp4z9u + Flow_0fl7rsj + status_arg = get_workflow_status(search_workflow_id) + + + + # Status Arg +{{ status_arg }} + Flow_0fl7rsj + Flow_00x8h5p + + + + Flow_00x8h5p + Flow_0lt7dwr + status_kwarg = get_workflow_status(search_workflow_id=search_workflow_id) + + + # Status Kwarg +{{ status_kwarg }} + Flow_0lt7dwr + Flow_0eg806h + + + Flow_0eg806h + + + + + + + + + + + + + + + Flow_0wppzk2 + Flow_0wp4z9u + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/tests/scripts/test_get_workflow_status.py b/tests/scripts/test_get_workflow_status.py new file mode 100644 index 00000000..3581d397 --- /dev/null +++ b/tests/scripts/test_get_workflow_status.py @@ -0,0 +1,35 @@ +from tests.base_test import BaseTest + +from crc import session +from crc.models.workflow import WorkflowModel + + +class TestGetWorkflowStatus(BaseTest): + + def test_get_workflow_status_validation(self): + self.load_example_data() + spec_model = self.load_test_spec('get_workflow_status') + rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers()) + self.assertEqual([], rv.json) + + def test_get_workflow_status(self): + self.load_example_data() + workflow_model_1 = session.query(WorkflowModel).filter(WorkflowModel.id == 1).first() + search_workflow_id = workflow_model_1.id + workflow = self.create_workflow('get_workflow_status') + workflow_api = self.get_workflow_api(workflow) + task = workflow_api.next_task + + # calls get_workflow_status(search_workflow_id) + workflow_api = self.complete_form(workflow, task, {'search_workflow_id': search_workflow_id}) + task = workflow_api.next_task + self.assertEqual('Activity_StatusArg', task.name) + self.assertEqual(task.data['status_arg'], workflow_model_1.status.value) + + # calls get_workflow_status(search_workflow_id=search_workflow_id) + workflow_api = self.complete_form(workflow, task, {}) + task = workflow_api.next_task + self.assertEqual('Activity_StatusKwarg', task.name) + self.assertEqual(task.data['status_kwarg'], workflow_model_1.status.value) + +