mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-23 13:18:35 +00:00
Added the following columns: * date_created - so we know when the file was created * renamed workflow_version to just "version", because everything has a version, this is the version of the request. * workflow_hash - this is just a quick way to see what files and versions are associated with the request, it could be factored out. * study - a quick relationship link to the study, so that this model is easier to use. * workflow - ditto * approval_files - these is a list from a new link table that links an approval to specific files and versions. The RequestApproval is logically sound, but still needs some additional pieces in place to be callable from a BPMN workflow diagram. Altered the file service to pick up on changes to files vs adding new files, so that versions are picked up correctly as users modify their submission - adding new files or replacing existing ones. Deleting files worries me, and I will need to revisit this. The damn base test keeps giving me a headache, so I made changes there to see if clearing and dropping the database each time won't allow the tests to pass more consistently. Lots more tests around the file service to make sure it is versioning user uploaded files correctly. The "Test Request Approval Script" tries to find to assure the correct behavior as this is likely to be called many times repeatedly and with little knowledge of the internal system. So it should just "do the right thing".
244 lines
9.4 KiB
Python
244 lines
9.4 KiB
Python
# Set environment variable to testing before loading.
|
|
# IMPORTANT - Environment must be loaded before app, models, etc....
|
|
import os
|
|
os.environ["TESTING"] = "true"
|
|
|
|
import json
|
|
import unittest
|
|
import urllib.parse
|
|
import datetime
|
|
|
|
from crc.models.protocol_builder import ProtocolBuilderStatus
|
|
from crc.models.study import StudyModel
|
|
from crc.services.file_service import FileService
|
|
from crc.services.study_service import StudyService
|
|
from crc.models.file import FileModel, FileDataModel, CONTENT_TYPES
|
|
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowModel
|
|
from crc.models.user import UserModel
|
|
|
|
from crc import app, db, session
|
|
from example_data import ExampleDataLoader
|
|
|
|
# UNCOMMENT THIS FOR DEBUGGING SQL ALCHEMY QUERIES
|
|
# import logging
|
|
# logging.basicConfig()
|
|
# logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
|
|
|
|
|
|
class BaseTest(unittest.TestCase):
|
|
""" Great class to inherit from, as it sets up and tears down classes
|
|
efficiently when we have a database in place.
|
|
"""
|
|
|
|
if not app.config['TESTING']:
|
|
raise (Exception("INVALID TEST CONFIGURATION. This is almost always in import order issue."
|
|
"The first class to import in each test should be the base_test.py file."))
|
|
|
|
auths = {}
|
|
test_uid = "dhf8r"
|
|
|
|
users = [
|
|
{
|
|
'uid':'dhf8r',
|
|
'email_address':'dhf8r@virginia.EDU',
|
|
'display_name':'Daniel Harold Funk',
|
|
'affiliation':'staff@virginia.edu;member@virginia.edu',
|
|
'eppn':'dhf8r@virginia.edu',
|
|
'first_name':'Daniel',
|
|
'last_name':'Funk',
|
|
'title':'SOFTWARE ENGINEER V'
|
|
}
|
|
]
|
|
|
|
studies = [
|
|
{
|
|
'id':0,
|
|
'title':'The impact of fried pickles on beer consumption in bipedal software developers.',
|
|
'last_updated':datetime.datetime.now(),
|
|
'protocol_builder_status':ProtocolBuilderStatus.ACTIVE,
|
|
'primary_investigator_id':'dhf8r',
|
|
'sponsor':'Sartography Pharmaceuticals',
|
|
'ind_number':'1234',
|
|
'user_uid':'dhf8r'
|
|
},
|
|
{
|
|
'id':1,
|
|
'title':'Requirement of hippocampal neurogenesis for the behavioral effects of soft pretzels',
|
|
'last_updated':datetime.datetime.now(),
|
|
'protocol_builder_status':ProtocolBuilderStatus.ACTIVE,
|
|
'primary_investigator_id':'dhf8r',
|
|
'sponsor':'Makerspace & Co.',
|
|
'ind_number':'5678',
|
|
'user_uid':'dhf8r'
|
|
}
|
|
]
|
|
|
|
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
app.config.from_object('config.testing')
|
|
cls.ctx = app.test_request_context()
|
|
cls.app = app.test_client()
|
|
cls.ctx.push()
|
|
db.create_all()
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
cls.ctx.pop()
|
|
db.drop_all()
|
|
pass
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
def tearDown(self):
|
|
ExampleDataLoader.clean_db()
|
|
session.flush()
|
|
self.auths = {}
|
|
|
|
def logged_in_headers(self, user=None, redirect_url='http://some/frontend/url'):
|
|
if user is None:
|
|
uid = self.test_uid
|
|
user_info = {'uid': self.test_uid, 'first_name': 'Daniel', 'last_name': 'Funk',
|
|
'email_address': 'dhf8r@virginia.edu'}
|
|
else:
|
|
uid = user.uid
|
|
user_info = {'uid': user.uid, 'first_name': user.first_name, 'last_name': user.last_name,
|
|
'email_address': user.email_address}
|
|
|
|
query_string = self.user_info_to_query_string(user_info, redirect_url)
|
|
rv = self.app.get("/v1.0/sso_backdoor%s" % query_string, follow_redirects=False)
|
|
self.assertTrue(rv.status_code == 302)
|
|
self.assertTrue(str.startswith(rv.location, redirect_url))
|
|
|
|
user_model = session.query(UserModel).filter_by(uid=uid).first()
|
|
self.assertIsNotNone(user_model.display_name)
|
|
return dict(Authorization='Bearer ' + user_model.encode_auth_token().decode())
|
|
|
|
def load_example_data(self):
|
|
from example_data import ExampleDataLoader
|
|
ExampleDataLoader.clean_db()
|
|
ExampleDataLoader().load_all()
|
|
|
|
for user_json in self.users:
|
|
db.session.add(UserModel(**user_json))
|
|
db.session.commit()
|
|
for study_json in self.studies:
|
|
study_model = StudyModel(**study_json)
|
|
db.session.add(study_model)
|
|
StudyService._add_all_workflow_specs_to_study(study_model)
|
|
db.session.commit()
|
|
db.session.flush()
|
|
|
|
specs = session.query(WorkflowSpecModel).all()
|
|
self.assertIsNotNone(specs)
|
|
|
|
for spec in specs:
|
|
files = session.query(FileModel).filter_by(workflow_spec_id=spec.id).all()
|
|
self.assertIsNotNone(files)
|
|
self.assertGreater(len(files), 0)
|
|
|
|
for spec in specs:
|
|
files = session.query(FileModel).filter_by(workflow_spec_id=spec.id).all()
|
|
self.assertIsNotNone(files)
|
|
self.assertGreater(len(files), 0)
|
|
for file in files:
|
|
file_data = session.query(FileDataModel).filter_by(file_model_id=file.id).all()
|
|
self.assertIsNotNone(file_data)
|
|
self.assertGreater(len(file_data), 0)
|
|
|
|
@staticmethod
|
|
def load_test_spec(dir_name, master_spec=False, category_id=None):
|
|
"""Loads a spec into the database based on a directory in /tests/data"""
|
|
if session.query(WorkflowSpecModel).filter_by(id=dir_name).count() > 0:
|
|
return session.query(WorkflowSpecModel).filter_by(id=dir_name).first()
|
|
filepath = os.path.join(app.root_path, '..', 'tests', 'data', dir_name, "*")
|
|
return ExampleDataLoader().create_spec(id=dir_name, name=dir_name, filepath=filepath, master_spec=master_spec,
|
|
category_id=category_id)
|
|
|
|
@staticmethod
|
|
def protocol_builder_response(file_name):
|
|
filepath = os.path.join(app.root_path, '..', 'tests', 'data', 'pb_responses', file_name)
|
|
with open(filepath, 'r') as myfile:
|
|
data = myfile.read()
|
|
return data
|
|
|
|
def assert_success(self, rv, msg=""):
|
|
try:
|
|
data = json.loads(rv.get_data(as_text=True))
|
|
self.assertTrue(200 <= rv.status_code < 300,
|
|
"BAD Response: %i. \n %s" %
|
|
(rv.status_code, json.dumps(data)) + ". " + msg)
|
|
except:
|
|
self.assertTrue(200 <= rv.status_code < 300,
|
|
"BAD Response: %i." % rv.status_code + ". " + msg)
|
|
|
|
def assert_failure(self, rv, status_code=0, error_code=""):
|
|
self.assertFalse(200 <= rv.status_code < 300,
|
|
"Incorrect Valid Response:" + rv.status + ".")
|
|
if status_code != 0:
|
|
self.assertEqual(status_code, rv.status_code)
|
|
if error_code != "":
|
|
data = json.loads(rv.get_data(as_text=True))
|
|
self.assertEqual(error_code, data["code"])
|
|
|
|
@staticmethod
|
|
def user_info_to_query_string(user_info, redirect_url):
|
|
query_string_list = []
|
|
items = user_info.items()
|
|
for key, value in items:
|
|
query_string_list.append('%s=%s' % (key, urllib.parse.quote(value)))
|
|
|
|
query_string_list.append('redirect_url=%s' % redirect_url)
|
|
|
|
return '?%s' % '&'.join(query_string_list)
|
|
|
|
|
|
def replace_file(self, name, file_path):
|
|
"""Replaces a stored file with the given name with the contents of the file at the given path."""
|
|
file_service = FileService()
|
|
file = open(file_path, "rb")
|
|
data = file.read()
|
|
|
|
file_model = db.session.query(FileModel).filter(FileModel.name == name).first()
|
|
noise, file_extension = os.path.splitext(file_path)
|
|
content_type = CONTENT_TYPES[file_extension[1:]]
|
|
file_service.update_file(file_model, data, content_type)
|
|
|
|
def create_user(self, uid="dhf8r", email="daniel.h.funk@gmail.com", display_name="Hoopy Frood"):
|
|
user = session.query(UserModel).filter(UserModel.uid == uid).first()
|
|
if user is None:
|
|
user = UserModel(uid=uid, email_address=email, display_name=display_name)
|
|
db.session.add(user)
|
|
db.session.commit()
|
|
return user
|
|
|
|
def create_study(self, uid="dhf8r", title="Beer conception in the bipedal software engineer"):
|
|
study = session.query(StudyModel).first()
|
|
if study is None:
|
|
user = self.create_user(uid=uid)
|
|
study = StudyModel(title=title, protocol_builder_status=ProtocolBuilderStatus.ACTIVE,
|
|
user_uid=user.uid)
|
|
db.session.add(study)
|
|
db.session.commit()
|
|
return study
|
|
|
|
def create_workflow(self, workflow_name, study=None, category_id=None):
|
|
db.session.flush()
|
|
workflow = db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.name == workflow_name).first()
|
|
if workflow:
|
|
return workflow
|
|
if study is None:
|
|
study = self.create_study()
|
|
spec = self.load_test_spec(workflow_name, category_id=category_id)
|
|
workflow_model = StudyService._create_workflow_model(study, spec)
|
|
return workflow_model
|
|
|
|
def create_reference_document(self):
|
|
file_path = os.path.join(app.root_path, 'static', 'reference', 'irb_documents.xlsx')
|
|
file = open(file_path, "rb")
|
|
FileService.add_reference_file(FileService.DOCUMENT_LIST,
|
|
binary_data=file.read(),
|
|
content_type=CONTENT_TYPES['xls'])
|
|
file.close()
|