mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-20 11:48:16 +00:00
Followup on Postgres - had some failing tests that needed cleaning up now that we are using a real database.
This commit is contained in:
parent
37df0bf8f4
commit
9e6fbfee79
@ -30,7 +30,7 @@ def update_file_from_request(file_model):
|
||||
else:
|
||||
file_model.type = FileType[file_extension]
|
||||
|
||||
file_data_model = session.query(FileDataModel).filter_by(id=file_model.id).with_for_update().first()
|
||||
file_data_model = session.query(FileDataModel).filter_by(file_model_id=file_model.id).with_for_update().first()
|
||||
if file_data_model is None:
|
||||
file_data_model = FileDataModel(data=file.stream.read(), file_model=file_model)
|
||||
else:
|
||||
|
@ -54,14 +54,14 @@ def delete_workflow_specification(spec_id):
|
||||
error = ApiError('unknown_spec', 'The Workflow Specification "' + spec_id + '" is not recognized.')
|
||||
return ApiErrorSchema.dump(error), 404
|
||||
|
||||
session.query(WorkflowSpecModel).filter_by(id=spec_id).delete()
|
||||
|
||||
# Delete all items in the database related to the deleted workflow spec.
|
||||
files = session.query(FileModel).filter_by(workflow_spec_id=spec_id).all()
|
||||
for file in files:
|
||||
delete_file(file.id)
|
||||
|
||||
session.query(WorkflowModel).filter_by(workflow_spec_id=spec_id).delete()
|
||||
session.query(WorkflowSpecModel).filter_by(id=spec_id).delete()
|
||||
|
||||
session.commit()
|
||||
|
||||
|
||||
|
@ -1,5 +1,5 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from datetime import datetime, tzinfo, timezone
|
||||
|
||||
from crc import session
|
||||
from crc.models.file import FileModel
|
||||
@ -21,7 +21,7 @@ class TestStudy(BaseTest):
|
||||
study = {
|
||||
"id": 12345,
|
||||
"title": "Phase III Trial of Genuine People Personalities (GPP) Autonomous Intelligent Emotional Agents for Interstellar Spacecraft",
|
||||
"last_updated": datetime.now(),
|
||||
"last_updated": datetime.now(tz=timezone.utc),
|
||||
"protocol_builder_status": ProtocolBuilderStatus.in_process,
|
||||
"primary_investigator_id": "tricia.marie.mcmillan@heartofgold.edu",
|
||||
"sponsor": "Sirius Cybernetics Corporation",
|
||||
@ -34,7 +34,7 @@ class TestStudy(BaseTest):
|
||||
db_study = session.query(StudyModel).filter_by(id=12345).first()
|
||||
self.assertIsNotNone(db_study)
|
||||
self.assertEqual(study["title"], db_study.title)
|
||||
self.assertEqual(study["last_updated"], db_study.last_updated)
|
||||
self.assertAlmostEqual(study["last_updated"], db_study.last_updated)
|
||||
self.assertEqual(study["protocol_builder_status"], db_study.protocol_builder_status)
|
||||
self.assertEqual(study["primary_investigator_id"], db_study.primary_investigator_id)
|
||||
self.assertEqual(study["sponsor"], db_study.sponsor)
|
||||
@ -45,12 +45,11 @@ class TestStudy(BaseTest):
|
||||
study: StudyModel = session.query(StudyModel).first()
|
||||
study.title = "Pilot Study of Fjord Placement for Single Fraction Outcomes to Cortisol Susceptibility"
|
||||
study.protocol_builder_status = ProtocolBuilderStatus.complete
|
||||
|
||||
rv = self.app.put('/v1.0/study/%i' % study.id,
|
||||
content_type="application/json",
|
||||
data=json.dumps(StudyModelSchema().dump(study)))
|
||||
self.assert_success(rv)
|
||||
db_study = session.query(StudyModel).first()
|
||||
db_study = session.query(StudyModel).filter_by(id=study.id).first()
|
||||
self.assertIsNotNone(db_study)
|
||||
self.assertEqual(study.title, db_study.title)
|
||||
self.assertEqual(study.protocol_builder_status, db_study.protocol_builder_status)
|
||||
@ -110,26 +109,27 @@ class TestStudy(BaseTest):
|
||||
self.assertEqual(db_spec, api_spec)
|
||||
|
||||
|
||||
def test_modify_workflow_specification(self):
|
||||
self.load_example_data()
|
||||
old_id = 'random_fact'
|
||||
spec: WorkflowSpecModel = session.query(WorkflowSpecModel).filter_by(id=old_id).first()
|
||||
""":type: WorkflowSpecModel"""
|
||||
|
||||
spec.id = 'odd_datum'
|
||||
num_before = session.query(WorkflowSpecModel).count()
|
||||
|
||||
rv = self.app.post('/v1.0/workflow-specification?spec_id=%s' % old_id, content_type="application/json",
|
||||
data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
|
||||
self.assert_success(rv)
|
||||
db_spec = session.query(WorkflowSpecModel).filter_by(id=spec.id).first()
|
||||
self.assertEqual(spec.display_name, db_spec.display_name)
|
||||
|
||||
num_old_after = session.query(WorkflowSpecModel).filter_by(id=old_id).count()
|
||||
self.assertEqual(num_old_after, 0)
|
||||
|
||||
num_after = session.query(WorkflowSpecModel).count()
|
||||
self.assertEqual(num_after, num_before)
|
||||
# def test_modify_workflow_specification(self):
|
||||
# """ You can't change a unique id. """
|
||||
# self.load_example_data()
|
||||
# old_id = 'random_fact'
|
||||
# spec: WorkflowSpecModel = session.query(WorkflowSpecModel).filter_by(id=old_id).first()
|
||||
# """:type: WorkflowSpecModel"""
|
||||
#
|
||||
# spec.id = 'odd_datum'
|
||||
# num_before = session.query(WorkflowSpecModel).count()
|
||||
#
|
||||
# rv = self.app.post('/v1.0/workflow-specification?spec_id=%s' % old_id, content_type="application/json",
|
||||
# data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
|
||||
# self.assert_success(rv)
|
||||
# db_spec = session.query(WorkflowSpecModel).filter_by(id=spec.id).first()
|
||||
# self.assertEqual(spec.display_name, db_spec.display_name)
|
||||
#
|
||||
# num_old_after = session.query(WorkflowSpecModel).filter_by(id=old_id).count()
|
||||
# self.assertEqual(num_old_after, 0)
|
||||
#
|
||||
# num_after = session.query(WorkflowSpecModel).count()
|
||||
# self.assertEqual(num_after, num_before)
|
||||
|
||||
def test_delete_workflow_specification(self):
|
||||
self.load_example_data()
|
||||
@ -185,83 +185,3 @@ class TestStudy(BaseTest):
|
||||
self.assert_success(rv)
|
||||
self.assertEqual(0, session.query(WorkflowModel).count())
|
||||
|
||||
def test_get_current_user_tasks(self):
|
||||
self.load_example_data()
|
||||
study = session.query(StudyModel).first()
|
||||
spec = session.query(WorkflowSpecModel).filter_by(id='random_fact').first()
|
||||
self.app.post('/v1.0/study/%i/workflows' % study.id, content_type="application/json",
|
||||
data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
|
||||
rv = self.app.get('/v1.0/workflow/%i/tasks' % study.id, content_type="application/json")
|
||||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
tasks = TaskSchema(many=True).load(json_data)
|
||||
self.assertEqual("Task_User_Select_Type", tasks[0].name)
|
||||
self.assertEqual(3, len(tasks[0].form["fields"][0]["options"]))
|
||||
|
||||
def test_two_forms_task(self):
|
||||
# Set up a new workflow
|
||||
self.load_example_data()
|
||||
study = session.query(StudyModel).first()
|
||||
spec = session.query(WorkflowSpecModel).filter_by(id='two_forms').first()
|
||||
rv = self.app.post('/v1.0/study/%i/workflows' % study.id, content_type="application/json",
|
||||
data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
workflow = WorkflowModelSchema().load(json_data, session=session)
|
||||
|
||||
# get the first form in the two form workflow.
|
||||
rv = self.app.get('/v1.0/workflow/%i/tasks' % workflow.id, content_type="application/json")
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
tasks = TaskSchema(many=True).load(json_data)
|
||||
self.assertEqual(1, len(tasks))
|
||||
self.assertIsNotNone(tasks[0].form)
|
||||
self.assertEqual("StepOne", tasks[0].name)
|
||||
self.assertEqual(1, len(tasks[0].form['fields']))
|
||||
|
||||
# Complete the form for Step one and post it.
|
||||
tasks[0].form['fields'][0]['value'] = "Blue"
|
||||
rv = self.app.put('/v1.0/workflow/%i/task/%s/data' % (workflow.id, tasks[0].id),
|
||||
content_type="application/json",
|
||||
data=json.dumps({"color": "blue"}))
|
||||
self.assert_success(rv)
|
||||
|
||||
# Get the next Task
|
||||
rv = self.app.get('/v1.0/workflow/%i/tasks' % study.id, content_type="application/json")
|
||||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
tasks = TaskSchema(many=True).load(json_data)
|
||||
self.assertEqual("StepTwo", tasks[0].name)
|
||||
|
||||
# Get all user Tasks and check that the data have been saved
|
||||
rv = self.app.get('/v1.0/workflow/%i/all_tasks' % study.id, content_type="application/json")
|
||||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
all_tasks = TaskSchema(many=True).load(json_data)
|
||||
for task in all_tasks:
|
||||
self.assertIsNotNone(task.data)
|
||||
for val in task.data.values():
|
||||
self.assertIsNotNone(val)
|
||||
|
||||
# def test_workflow_with_parallel_forms(self):
|
||||
# self.load_example_data()
|
||||
# study = session.query(StudyModel).first()
|
||||
# spec = session.query(WorkflowSpecModel).filter_by(id='parallel_forms').first()
|
||||
# rv = self.app.post('/v1.0/study/%i/workflows' % study.id, content_type="application/json",
|
||||
# data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
|
||||
# json_data = json.loads(rv.get_data(as_text=True))
|
||||
# workflow = WorkflowModelSchema().load(json_data, session=session)
|
||||
#
|
||||
# # get the first form in the two form workflow.
|
||||
# rv = self.app.get('/v1.0/workflow/%i/tasks' % workflow.id, content_type="application/json")
|
||||
# json_data = json.loads(rv.get_data(as_text=True))
|
||||
# tasks = TaskSchema(many=True).load(json_data)
|
||||
# rv = self.app.put('/v1.0/workflow/%i/task/%s/data' % (workflow.id, tasks[0].id),
|
||||
# content_type="application/json",
|
||||
# data=json.dumps({"color": "blue"}))
|
||||
# self.assert_success(rv)
|
||||
#
|
||||
# # Get the next Task
|
||||
# rv = self.app.get('/v1.0/workflow/%i/tasks' % study.id, content_type="application/json")
|
||||
# self.assert_success(rv)
|
||||
# json_data = json.loads(rv.get_data(as_text=True))
|
||||
# tasks = TaskSchema(many=True).load(json_data)
|
||||
# self.assertEqual("StepTwo", tasks[0].name)
|
||||
|
@ -68,28 +68,29 @@ class TestApiFiles(BaseTest):
|
||||
content_type="application/json",
|
||||
data=json.dumps(FileModelSchema().dump(file)))
|
||||
self.assert_success(rv)
|
||||
db_file = session.query(FileModel).first()
|
||||
db_file = session.query(FileModel).filter_by(id=file.id).first()
|
||||
self.assertIsNotNone(db_file)
|
||||
self.assertEqual(file.name, db_file.name)
|
||||
|
||||
def test_update_file_data(self):
|
||||
self.load_example_data()
|
||||
spec = session.query(WorkflowSpecModel).first()
|
||||
file = session.query(FileModel).filter_by(workflow_spec_id=spec.id).first()
|
||||
|
||||
data = {}
|
||||
data['file'] = io.BytesIO(b"hijklim"), 'random_fact.bpmn'
|
||||
data['file'] = io.BytesIO(b"abcdef"), 'my_new_file.bpmn'
|
||||
rv = self.app.post('/v1.0/file?workflow_spec_id=%s' % spec.id, data=data, follow_redirects=True,
|
||||
content_type='multipart/form-data')
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
file = FileModelSchema().load(json_data, session=session)
|
||||
|
||||
data['file'] = io.BytesIO(b"hijklim"), 'my_new_file.bpmn'
|
||||
rv = self.app.put('/v1.0/file/%i/data' % file.id, data=data, follow_redirects=True,
|
||||
content_type='multipart/form-data')
|
||||
|
||||
self.assert_success(rv)
|
||||
self.assertIsNotNone(rv.get_data())
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
file = FileModelSchema().load(json_data, session=session)
|
||||
self.assertEqual(2, file.version)
|
||||
self.assertEqual(FileType.bpmn, file.type)
|
||||
self.assertTrue(file.primary)
|
||||
self.assertEqual("application/octet-stream", file.content_type)
|
||||
self.assertEqual(spec.id, file.workflow_spec_id)
|
||||
|
||||
|
95
tests/test_tasks_api.py
Normal file
95
tests/test_tasks_api.py
Normal file
@ -0,0 +1,95 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
from crc import session
|
||||
from crc.models.file import FileModel
|
||||
from crc.models.study import StudyModel, StudyModelSchema, ProtocolBuilderStatus
|
||||
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowModel, WorkflowStatus, \
|
||||
WorkflowModelSchema, TaskSchema
|
||||
from tests.base_test import BaseTest
|
||||
|
||||
|
||||
class TestTasksApi(BaseTest):
|
||||
|
||||
def create_workflow(self, workflow_name):
|
||||
study = session.query(StudyModel).first()
|
||||
spec = session.query(WorkflowSpecModel).filter_by(id=workflow_name).first()
|
||||
self.app.post('/v1.0/study/%i/workflows' % study.id, content_type="application/json",
|
||||
data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
|
||||
workflow = session.query(WorkflowModel).filter_by(study_id = study.id, workflow_spec_id=workflow_name).first()
|
||||
return workflow
|
||||
|
||||
def get_tasks(self, workflow):
|
||||
rv = self.app.get('/v1.0/workflow/%i/tasks' % workflow.id, content_type="application/json")
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
tasks = TaskSchema(many=True).load(json_data)
|
||||
return tasks
|
||||
|
||||
def get_all_tasks(self, workflow):
|
||||
rv = self.app.get('/v1.0/workflow/%i/all_tasks' % workflow.id, content_type="application/json")
|
||||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
all_tasks = TaskSchema(many=True).load(json_data)
|
||||
return all_tasks
|
||||
|
||||
def complete_form(self, workflow, task, dict_data):
|
||||
rv = self.app.put('/v1.0/workflow/%i/task/%s/data' % (workflow.id, task.id),
|
||||
content_type="application/json",
|
||||
data=json.dumps(dict_data))
|
||||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
workflow = WorkflowModelSchema().load(json_data, session=session)
|
||||
return workflow
|
||||
|
||||
def test_get_current_user_tasks(self):
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('random_fact')
|
||||
tasks = self.get_tasks(workflow)
|
||||
self.assertEqual("Task_User_Select_Type", tasks[0].name)
|
||||
self.assertEqual(3, len(tasks[0].form["fields"][0]["options"]))
|
||||
|
||||
def test_two_forms_task(self):
|
||||
# Set up a new workflow
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('two_forms')
|
||||
# get the first form in the two form workflow.
|
||||
tasks = self.get_tasks(workflow)
|
||||
self.assertEqual(1, len(tasks))
|
||||
self.assertIsNotNone(tasks[0].form)
|
||||
self.assertEqual("StepOne", tasks[0].name)
|
||||
self.assertEqual(1, len(tasks[0].form['fields']))
|
||||
|
||||
# Complete the form for Step one and post it.
|
||||
self.complete_form(workflow, tasks[0], {"color": "blue"})
|
||||
|
||||
# Get the next Task
|
||||
tasks = self.get_tasks(workflow)
|
||||
self.assertEqual("StepTwo", tasks[0].name)
|
||||
|
||||
# Get all user Tasks and check that the data have been saved
|
||||
all_tasks = self.get_all_tasks(workflow)
|
||||
for task in all_tasks:
|
||||
self.assertIsNotNone(task.data)
|
||||
for val in task.data.values():
|
||||
self.assertIsNotNone(val)
|
||||
|
||||
def test_error_message_on_bad_gateway_expression(self):
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('exclusive_gateway')
|
||||
|
||||
# get the first form in the two form workflow.
|
||||
tasks = self.get_tasks(workflow)
|
||||
self.complete_form(workflow, tasks[0], {"has_bananas": True})
|
||||
|
||||
|
||||
def test_workflow_with_parallel_forms(self):
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('exclusive_gateway')
|
||||
|
||||
# get the first form in the two form workflow.
|
||||
tasks = self.get_tasks(workflow)
|
||||
self.complete_form(workflow, tasks[0], {"has_bananas": True})
|
||||
|
||||
# Get the next Task
|
||||
tasks = self.get_tasks(workflow)
|
||||
self.assertEqual("Task_Why_No_Bananas", tasks[0].name)
|
Loading…
x
Reference in New Issue
Block a user