Merge pull request #225 from sartography/dev

dev to testing
This commit is contained in:
Dan Funk 2021-01-20 17:49:40 -05:00 committed by GitHub
commit c60754155f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 175 additions and 115 deletions

View File

@ -64,6 +64,44 @@ it should always be visible in the Docker Desktop Daskboard with a friendly litt
stop button for your clicking pleasure.
### Environment Setup Part #2
If you want to run CR-Connect in development mode you must have the following two services running:
1. `Postgres Docker`: There is a sub-directory called Postgres that contains a docker image that will set up an empty
database for CR-Connect, and for Protocol Builder Mock, mentioned below. For must systems, you can 'cd' into this
directory and just run start.sh to fire up the Postgres service, and stop.sh to shut it back down again.
create .env file in /postgres with the following two lines in it:
```
DB_USER=crc_user
DB_PASS=crc_pass
```
With this in place, from the command line:
```bash
cd postgres
./start.sh
```
You can now build the database structure in the newly created database with the following lines
```baseh
cd .. (into base directory)
flask db upgrade
flask load-example-data (this creates some basic workflows for you to use)
```
2. `Protocol Builder Mock`: We created a mock of the Protocol Builder, a critical service at UVA that is a deep
dependency for CR-Connect. You can find the details here: [Protocol Builder Mock](https://github.com/sartography/protocol-builder-mock)
Be sure this is up and running on Port 5002 or you will encounter errors when the system starts up.
With Protocol Builder Mock up and running, visit http://localhost:5001 and create a study. Set the user
and primary investigator to dhf8r - which is a user in the mock ldap service, and this will later show up when you
fire up the interface.
### Configuration
1. `instance/config.py`: This will
### Project Initialization
1. Clone this repository.
2. In PyCharm:

View File

@ -850,20 +850,8 @@ paths:
format: int32
get:
operationId: crc.api.workflow.get_workflow
summary: Returns a workflow, can also be used to do a soft or hard reset on the workflow.
summary: Returns a workflow.
parameters:
- name: soft_reset
in: query
required: false
description: Set this to true to use the latest workflow specification to load minor modifications to the spec.
schema:
type: boolean
- name: hard_reset
in: query
required: false
description: Set this to true to reset the workflow
schema:
type: boolean
- name: do_engine_steps
in: query
required: false
@ -889,6 +877,35 @@ paths:
responses:
'204':
description: The workflow was removed
/workflow/{workflow_id}/restart:
parameters:
- name: workflow_id
in: path
required: true
description: The id of the workflow
schema:
type: integer
format: int32
get:
operationId: crc.api.workflow.restart_workflow
summary: Restarts a workflow with the latest spec. Can also clear data.
parameters:
- name: clear_data
in: query
required: false
description: Set this to true to clear data when starting workflow.
schema:
type: boolean
tags:
- Workflows and Tasks
responses:
'200':
description: Returns updated workflow, possibly without data.
content:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/workflow/{workflow_id}/task/{task_id}/data:
parameters:
- name: workflow_id

View File

@ -96,30 +96,29 @@ def delete_workflow_specification(spec_id):
session.commit()
def get_workflow(workflow_id, soft_reset=False, hard_reset=False, do_engine_steps=True):
"""Soft reset will attempt to update to the latest spec without starting over,
Hard reset will update to the latest spec and start from the beginning.
Read Only will return the workflow in a read only state, without running any
engine tasks or logging any events. """
def get_workflow(workflow_id, do_engine_steps=True):
"""Retrieve workflow based on workflow_id, and return it in the last saved State.
If do_engine_steps is False, return the workflow without running any engine tasks or logging any events. """
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
processor = WorkflowProcessor(workflow_model)
if soft_reset or hard_reset:
try:
processor.cancel_notify()
except Exception as e:
raise e
finally:
# In the event of a reset, ALWAYS allow the reset, even if the cancel_notify fails for some reason.
processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset)
if do_engine_steps:
processor.do_engine_steps()
processor.save()
WorkflowService.update_task_assignments(processor)
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
return WorkflowApiSchema().dump(workflow_api_model)
def restart_workflow(workflow_id, clear_data=False):
"""Restart a workflow with the latest spec.
Clear data allows user to restart the workflow without previous data."""
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
WorkflowProcessor.reset(workflow_model, clear_data=clear_data)
return get_workflow(workflow_model.id)
def get_task_events(action = None, workflow = None, study = None):
"""Provides a way to see a history of what has happened, or get a list of tasks that need your attention."""
query = session.query(TaskEventModel).filter(TaskEventModel.user_uid == g.user.uid)

View File

@ -21,6 +21,7 @@ import crc
from crc import session, app
from crc.api.common import ApiError
from crc.models.file import FileDataModel, FileModel, FileType
from crc.models.task_event import TaskEventModel
from crc.models.workflow import WorkflowStatus, WorkflowModel, WorkflowSpecDependencyFile
from crc.scripts.script import Script
from crc.services.file_service import FileService
@ -150,19 +151,12 @@ class WorkflowProcessor(object):
STUDY_ID_KEY = "study_id"
VALIDATION_PROCESS_KEY = "validate_only"
def __init__(self, workflow_model: WorkflowModel,
soft_reset=False, hard_reset=False, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model.
If soft_reset is set to true, it will try to use the latest version of the workflow specification
without resetting to the beginning of the workflow. This will work for some minor changes to the spec.
If hard_reset is set to true, it will use the latest spec, and start the workflow over from the beginning.
which should work in casees where a soft reset fails.
If neither flag is set, it will use the same version of the specification that was used to originally
create the workflow model. """
def __init__(self, workflow_model: WorkflowModel, validate_only=False):
"""Create a Workflow Processor based on the serialized information available in the workflow model."""
self.workflow_model = workflow_model
if soft_reset or len(workflow_model.dependencies) == 0: # Depenencies of 0 means the workflow was never started.
if workflow_model.bpmn_workflow_json is None: # The workflow was never started.
self.spec_data_files = FileService.get_spec_data_files(
workflow_spec_id=workflow_model.workflow_spec_id)
spec = self.get_spec(self.spec_data_files, workflow_model.workflow_spec_id)
@ -187,22 +181,14 @@ class WorkflowProcessor(object):
# can then load data as needed.
self.bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(
self.bpmn_workflow,include_spec=True)
self.bpmn_workflow, include_spec=True)
self.save()
except MissingSpecError as ke:
raise ApiError(code="unexpected_workflow_structure",
message="Failed to deserialize workflow"
" '%s' version %s, due to a mis-placed or missing task '%s'" %
(self.workflow_spec_id, self.get_version_string(), str(ke)) +
" This is very likely due to a soft reset where there was a structural change.")
if hard_reset:
# Now that the spec is loaded, get the data and rebuild the bpmn with the new details
self.hard_reset()
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(self.bpmn_workflow)
self.save()
if soft_reset:
self.save()
(self.workflow_spec_id, self.get_version_string(), str(ke)))
# set whether this is the latest spec file.
if self.spec_data_files == FileService.get_spec_data_files(workflow_spec_id=workflow_model.workflow_spec_id):
@ -210,6 +196,21 @@ class WorkflowProcessor(object):
else:
self.is_latest_spec = False
@classmethod
def reset(cls, workflow_model, clear_data=False):
print('WorkflowProcessor: reset: ')
workflow_model.bpmn_workflow_json = None
if clear_data:
# Clear form_data from task_events
task_events = session.query(TaskEventModel). \
filter(TaskEventModel.workflow_id == workflow_model.id).all()
for task_event in task_events:
task_event.form_data = {}
session.add(task_event)
session.commit()
return cls(workflow_model)
def __get_bpmn_workflow(self, workflow_model: WorkflowModel, spec: WorkflowSpec, validate_only=False):
if workflow_model.bpmn_workflow_json:
bpmn_workflow = self._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json,
@ -355,18 +356,18 @@ class WorkflowProcessor(object):
else:
return WorkflowStatus.waiting
def hard_reset(self):
"""Recreate this workflow. This will be useful when a workflow specification changes.
"""
self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
new_bpmn_workflow.data = self.bpmn_workflow.data
try:
new_bpmn_workflow.do_engine_steps()
except WorkflowException as we:
raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender)
self.bpmn_workflow = new_bpmn_workflow
# def hard_reset(self):
# """Recreate this workflow. This will be useful when a workflow specification changes.
# """
# self.spec_data_files = FileService.get_spec_data_files(workflow_spec_id=self.workflow_spec_id)
# new_spec = WorkflowProcessor.get_spec(self.spec_data_files, self.workflow_spec_id)
# new_bpmn_workflow = BpmnWorkflow(new_spec, script_engine=self._script_engine)
# new_bpmn_workflow.data = self.bpmn_workflow.data
# try:
# new_bpmn_workflow.do_engine_steps()
# except WorkflowException as we:
# raise ApiError.from_task_spec("hard_reset_engine_steps_error", str(we), we.sender)
# self.bpmn_workflow = new_bpmn_workflow
def get_status(self):
return self.status_of(self.bpmn_workflow)

View File

@ -341,18 +341,30 @@ class BaseTest(unittest.TestCase):
session.commit()
return approval
def get_workflow_api(self, workflow, soft_reset=False, hard_reset=False, do_engine_steps=True, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
rv = self.app.get(f'/v1.0/workflow/{workflow.id}'
f'?soft_reset={str(soft_reset)}'
f'&hard_reset={str(hard_reset)}'
f'&do_engine_steps={str(do_engine_steps)}',
def get_workflow_common(self, url, user):
rv = self.app.get(url,
headers=self.logged_in_headers(user),
content_type="application/json")
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
workflow_api = WorkflowApiSchema().load(json_data)
return workflow_api
def get_workflow_api(self, workflow, do_engine_steps=True, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
url = (f'/v1.0/workflow/{workflow.id}'
f'?do_engine_steps={str(do_engine_steps)}')
workflow_api = self.get_workflow_common(url, user)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
return workflow_api
def restart_workflow_api(self, workflow, clear_data=False, user_uid="dhf8r"):
user = session.query(UserModel).filter_by(uid=user_uid).first()
self.assertIsNotNone(user)
url = (f'/v1.0/workflow/{workflow.id}/restart'
f'?clear_data={str(clear_data)}')
workflow_api = self.get_workflow_common(url, user)
self.assertEqual(workflow.workflow_spec_id, workflow_api.workflow_spec_id)
return workflow_api

View File

@ -53,7 +53,9 @@ class TestLookupService(BaseTest):
file.close()
# restart the workflow, so it can pick up the changes.
WorkflowProcessor(workflow, soft_reset=True)
processor = WorkflowProcessor.reset(workflow)
workflow = processor.workflow_model
LookupService.lookup(workflow, "sponsor", "sam", limit=10)
lookup_records = session.query(LookupFileModel).all()

View File

@ -204,7 +204,7 @@ class TestTasksApi(BaseTest):
self.assertTrue(workflow_api.spec_version.startswith("v1 "))
self.assertFalse(workflow_api.is_latest_spec)
workflow_api = self.get_workflow_api(workflow, hard_reset=True)
workflow_api = self.restart_workflow_api(workflow_api, clear_data=True)
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
self.assertTrue(workflow_api.is_latest_spec)
@ -213,30 +213,6 @@ class TestTasksApi(BaseTest):
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
self.assertTrue(workflow_api.is_latest_spec)
def test_soft_reset_errors_out_and_next_result_is_on_original_version(self):
# Start the basic two_forms workflow and complete a task.
workflow = self.create_workflow('two_forms')
workflow_api = self.get_workflow_api(workflow)
self.complete_form(workflow, workflow_api.next_task, {"color": "blue"})
self.assertTrue(workflow_api.is_latest_spec)
# Modify the specification, with a major change that alters the flow and can't be deserialized
# effectively, if it uses the latest spec files.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'modified', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# perform a soft reset returns an error
rv = self.app.get('/v1.0/workflow/%i?soft_reset=%s&hard_reset=%s' %
(workflow.id, "true", "false"),
content_type="application/json",
headers=self.logged_in_headers())
self.assert_failure(rv, error_code="unexpected_workflow_structure")
# Try again without a soft reset, and we are still ok, and on the original version.
workflow_api = self.get_workflow_api(workflow)
self.assertTrue(workflow_api.spec_version.startswith("v1 "))
self.assertFalse(workflow_api.is_latest_spec)
def test_manual_task_with_external_documentation(self):
workflow = self.create_workflow('manual_task_with_external_documentation')

View File

@ -228,7 +228,8 @@ class TestTasksApi(BaseTest):
self.assertEqual(1, len(self.get_assignment_task_events(supervisor.uid)))
# Resetting the workflow at this point should clear the event log.
workflow_api = self.get_workflow_api(workflow, hard_reset=True, user_uid=submitter.uid)
workflow_api = self.restart_workflow_api(workflow, user_uid=submitter.uid)
workflow_api = self.get_workflow_api(workflow, user_uid=submitter.uid)
self.assertEqual(0, len(self.get_assignment_task_events(supervisor.uid)))
# Re-complete first task, and awaiting tasks should shift to 0 for for submitter, and 1 for supervisor

View File

@ -173,28 +173,6 @@ class TestWorkflowProcessor(BaseTest):
self.assertEqual("workflow_validation_error", context.exception.code)
self.assertTrue("bpmn:startEvent" in context.exception.message)
def test_workflow_spec_key_error(self):
"""Frequently seeing errors in the logs about a 'Key' error, where a workflow
references something that doesn't exist in the midst of processing. Want to
make sure we produce errors to the front end that allows us to debug this."""
# Start the two_forms workflow, and enter some data in the first form.
self.load_example_data()
study = session.query(StudyModel).first()
workflow_spec_model = self.load_test_spec("two_forms")
processor = self.get_processor(study, workflow_spec_model)
self.assertEqual(processor.workflow_model.workflow_spec_id, workflow_spec_model.id)
task = processor.next_task()
task.data = {"color": "blue"}
processor.complete_task(task)
# Modify the specification, with a major change.
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'modified', 'two_forms_struc_mod.bpmn')
self.replace_file("two_forms.bpmn", file_path)
# Attempting a soft update on a structural change should raise a sensible error.
with self.assertRaises(ApiError) as context:
processor3 = WorkflowProcessor(processor.workflow_model, soft_reset=True)
self.assertEqual("unexpected_workflow_structure", context.exception.code)
def test_workflow_with_bad_expression_raises_sensible_error(self):
self.load_example_data()
@ -301,7 +279,9 @@ class TestWorkflowProcessor(BaseTest):
self.assertFalse(processor2.is_latest_spec) # Still at version 1.
# Do a hard reset, which should bring us back to the beginning, but retain the data.
processor3 = WorkflowProcessor(processor.workflow_model, hard_reset=True)
WorkflowProcessor.reset(processor2.workflow_model)
processor3 = WorkflowProcessor(processor.workflow_model)
processor3.do_engine_steps()
self.assertEqual("Step 1", processor3.next_task().task_spec.description)
self.assertTrue(processor3.is_latest_spec) # Now at version 2.
task = processor3.next_task()

View File

@ -0,0 +1,34 @@
from tests.base_test import BaseTest
class TestMessageEvent(BaseTest):
def test_message_event(self):
workflow = self.create_workflow('message_event')
first_task = self.get_workflow_api(workflow).next_task
self.assertEqual('Activity_GetData', first_task.name)
workflow_api = self.get_workflow_api(workflow)
result = self.complete_form(workflow_api, first_task, {'formdata': 'asdf'})
self.assertIn('formdata', result.next_task.data)
self.assertEqual('asdf', result.next_task.data['formdata'])
workflow_api = self.get_workflow_api(workflow)
self.assertEqual('Activity_HowMany', self.get_workflow_api(workflow_api).next_task.name)
# restart with data. should land at beginning with data
workflow_api = self.restart_workflow_api(result)
first_task = self.get_workflow_api(workflow_api).next_task
self.assertEqual('Activity_GetData', first_task.name)
self.assertIn('formdata', workflow_api.next_task.data)
self.assertEqual('asdf', workflow_api.next_task.data['formdata'])
# restart without data.
workflow_api = self.restart_workflow_api(workflow_api, clear_data=True)
first_task = self.get_workflow_api(workflow).next_task
self.assertEqual('Activity_GetData', first_task.name)
self.assertNotIn('formdata', workflow_api.next_task.data)
print('Nice Test')