2019-12-27 18:50:03 +00:00
|
|
|
import io
|
|
|
|
import json
|
2021-07-06 17:10:20 +00:00
|
|
|
import os
|
A major refactor of how we search and store files, as there was a lot of confusing bits in here.
From an API point of view you can do the following (and only the following)
/files?workflow_spec_id=x
* You can find all files associated with a workflow_spec_id, and add a file with a workflow_spec_id
/files?workflow_id=x
* You can find all files associated with a workflow_id, and add a file that is directly associated with the workflow
/files?workflow_id=x&form_field_key=y
* You can find all files associated with a form element on a running workflow, and add a new file.
Note: you can add multiple files to the same form_field_key, IF they have different file names. If the same name, the original file is archived,
and the new file takes its place.
The study endpoints always return a list of the file metadata associated with the study. Removed /studies-files, but there is an
endpoint called
/studies/all - that returns all the studies in the system, and does include their files.
On a deeper level:
The File model no longer contains:
- study_id,
- task_id,
- form_field_key
Instead, if the file is associated with workflow - then that is the one way it is connected to the study, and we use this relationship to find files for a study.
A file is never associated with a task_id, as these change when the workflow is reloaded.
The form_field_key must match the irb_doc_code, so when requesting files for a form field, we just look up the irb_doc_code.
2020-05-28 12:27:26 +00:00
|
|
|
|
|
|
|
from tests.base_test import BaseTest
|
2019-12-27 18:50:03 +00:00
|
|
|
|
2021-07-06 17:10:20 +00:00
|
|
|
from crc import session, db, app
|
2020-05-29 00:03:50 +00:00
|
|
|
from crc.models.file import FileModel, FileType, FileSchema, FileModelSchema
|
2020-01-24 16:52:52 +00:00
|
|
|
from crc.models.workflow import WorkflowSpecModel
|
2020-03-20 12:21:21 +00:00
|
|
|
from crc.services.file_service import FileService
|
2020-03-19 21:13:30 +00:00
|
|
|
from crc.services.workflow_processor import WorkflowProcessor
|
2021-06-08 12:03:14 +00:00
|
|
|
from crc.models.data_store import DataStoreModel
|
2021-07-06 17:10:20 +00:00
|
|
|
from crc.services.document_service import DocumentService
|
2020-05-07 17:57:24 +00:00
|
|
|
from example_data import ExampleDataLoader
|
2019-12-27 18:50:03 +00:00
|
|
|
|
|
|
|
|
2020-02-20 18:30:04 +00:00
|
|
|
class TestFilesApi(BaseTest):
|
2019-12-27 18:50:03 +00:00
|
|
|
|
|
|
|
def test_list_files_for_workflow_spec(self):
|
2020-05-25 16:29:05 +00:00
|
|
|
self.load_example_data(use_crc_data=True)
|
2020-04-15 14:58:13 +00:00
|
|
|
spec_id = 'core_info'
|
|
|
|
spec = session.query(WorkflowSpecModel).filter_by(id=spec_id).first()
|
|
|
|
rv = self.app.get('/v1.0/file?workflow_spec_id=%s' % spec_id,
|
2019-12-27 18:50:03 +00:00
|
|
|
follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type="application/json", headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
2020-04-28 02:54:05 +00:00
|
|
|
self.assertEqual(5, len(json_data))
|
|
|
|
files = FileModelSchema(many=True).load(json_data, session=session)
|
|
|
|
file_names = [f.name for f in files]
|
|
|
|
self.assertTrue("%s.bpmn" % spec.name in file_names)
|
2019-12-27 18:50:03 +00:00
|
|
|
|
|
|
|
def test_list_multiple_files_for_workflow_spec(self):
|
|
|
|
self.load_example_data()
|
Created a "StudyService" and moved all complex logic around study manipulation out of the study api, and this service, as things were getting complicated. The Workflow Processor no longer creates the WorkflowModel, the study object handles that, and only passes the model into the workflow processor when it is ready to start the workflow.
Created a Study object (seperate from the StudyModel) that can cronstructed on request, and contains a different data structure than we store in the DB. This allows us to return underlying Categories and Workflows in a clean way.
Added a new status to workflows called "not_started", meaning we have not yet instantiated a processor or created a BPMN, they have no version yet and no stored data, just the possiblity of being started.
The Top Level Workflow or "Master" workflow is now a part of the sample data, and loaded at all times.
Removed the ability to "add a workflow to a study" and "remove a workflow from a study", a study contains all possible workflows by definition.
Example data no longer creates users or studies, it just creates the specs.
2020-03-30 12:00:16 +00:00
|
|
|
spec = self.load_test_spec("random_fact")
|
2020-03-04 18:40:25 +00:00
|
|
|
svgFile = FileModel(name="test.svg", type=FileType.svg,
|
2019-12-27 18:50:03 +00:00
|
|
|
primary=False, workflow_spec_id=spec.id)
|
2020-01-14 16:45:12 +00:00
|
|
|
session.add(svgFile)
|
|
|
|
session.flush()
|
2020-02-04 02:56:18 +00:00
|
|
|
rv = self.app.get('/v1.0/file?workflow_spec_id=%s' % spec.id,
|
2019-12-27 18:50:03 +00:00
|
|
|
follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type="application/json", headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
2020-12-14 15:37:16 +00:00
|
|
|
self.assertEqual(3, len(json_data))
|
2019-12-27 18:50:03 +00:00
|
|
|
|
2020-06-04 13:49:42 +00:00
|
|
|
|
2019-12-27 18:50:03 +00:00
|
|
|
def test_create_file(self):
|
|
|
|
self.load_example_data()
|
2020-01-14 16:45:12 +00:00
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
2020-02-04 02:56:18 +00:00
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
rv = self.app.post('/v1.0/file?workflow_spec_id=%s' % spec.id, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
|
|
|
|
self.assert_success(rv)
|
|
|
|
self.assertIsNotNone(rv.get_data())
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
2020-01-14 16:45:12 +00:00
|
|
|
file = FileModelSchema().load(json_data, session=session)
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assertEqual(FileType.svg, file.type)
|
|
|
|
self.assertFalse(file.primary)
|
|
|
|
self.assertEqual("image/svg+xml", file.content_type)
|
|
|
|
self.assertEqual(spec.id, file.workflow_spec_id)
|
|
|
|
|
2020-03-24 18:15:21 +00:00
|
|
|
rv = self.app.get('/v1.0/file/%i' % file.id, headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
2020-01-14 16:45:12 +00:00
|
|
|
file2 = FileModelSchema().load(json_data, session=session)
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assertEqual(file, file2)
|
|
|
|
|
2020-03-19 21:13:30 +00:00
|
|
|
def test_add_file_from_task_and_form_errors_on_invalid_form_field_name(self):
|
|
|
|
self.create_reference_document()
|
|
|
|
workflow = self.create_workflow('file_upload_form')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
2020-07-28 17:33:38 +00:00
|
|
|
processor.do_engine_steps()
|
2020-03-19 21:13:30 +00:00
|
|
|
task = processor.next_task()
|
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
correct_name = task.task_spec.form.fields[0].id
|
|
|
|
|
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_id=%i&form_field_key=%s' %
|
|
|
|
(workflow.study_id, workflow.id, task.id, correct_name), data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2020-03-19 21:13:30 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
|
2020-06-04 13:49:42 +00:00
|
|
|
def test_archive_file_no_longer_shows_up(self):
|
|
|
|
self.load_example_data()
|
|
|
|
self.create_reference_document()
|
|
|
|
workflow = self.create_workflow('file_upload_form')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
2020-07-28 17:33:38 +00:00
|
|
|
processor.do_engine_steps()
|
2020-06-04 13:49:42 +00:00
|
|
|
task = processor.next_task()
|
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
correct_name = task.task_spec.form.fields[0].id
|
|
|
|
|
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_id=%i&form_field_key=%s' %
|
|
|
|
(workflow.study_id, workflow.id, task.id, correct_name), data=data, follow_redirects=True,
|
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
|
|
|
|
self.assert_success(rv)
|
|
|
|
rv = self.app.get('/v1.0/file?workflow_id=%s' % workflow.id, headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
2020-06-18 03:11:47 +00:00
|
|
|
self.assertEqual(1, len(json.loads(rv.get_data(as_text=True))))
|
2020-06-04 13:49:42 +00:00
|
|
|
|
|
|
|
file_model = db.session.query(FileModel).filter(FileModel.workflow_id == workflow.id).all()
|
2020-06-18 03:11:47 +00:00
|
|
|
self.assertEqual(1, len(file_model))
|
2020-06-04 13:49:42 +00:00
|
|
|
file_model[0].archived = True
|
|
|
|
db.session.commit()
|
|
|
|
|
|
|
|
rv = self.app.get('/v1.0/file?workflow_id=%s' % workflow.id, headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
2020-06-18 03:11:47 +00:00
|
|
|
self.assertEqual(0, len(json.loads(rv.get_data(as_text=True))))
|
2020-06-04 13:49:42 +00:00
|
|
|
|
2020-03-13 19:03:57 +00:00
|
|
|
def test_set_reference_file(self):
|
2021-07-06 17:10:20 +00:00
|
|
|
file_name = "irb_documents.xlsx"
|
|
|
|
filepath = os.path.join(app.root_path, 'static', 'reference', 'irb_documents.xlsx')
|
|
|
|
with open(filepath, 'rb') as myfile:
|
|
|
|
file_data = myfile.read()
|
|
|
|
data = {'file': (io.BytesIO(file_data), file_name)}
|
2020-03-13 19:03:57 +00:00
|
|
|
rv = self.app.put('/v1.0/reference_file/%s' % file_name, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2020-03-13 19:03:57 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
self.assertIsNotNone(rv.get_data())
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
file = FileModelSchema().load(json_data, session=session)
|
2021-07-06 17:10:20 +00:00
|
|
|
self.assertEqual(FileType.xlsx, file.type)
|
2020-03-13 19:03:57 +00:00
|
|
|
self.assertTrue(file.is_reference)
|
2021-07-06 17:10:20 +00:00
|
|
|
self.assertEqual("application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", file.content_type)
|
2020-03-13 19:03:57 +00:00
|
|
|
|
|
|
|
def test_set_reference_file_bad_extension(self):
|
2021-07-06 17:10:20 +00:00
|
|
|
file_name = DocumentService.DOCUMENT_LIST
|
2020-03-13 19:03:57 +00:00
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), "does_not_matter.ppt")}
|
|
|
|
rv = self.app.put('/v1.0/reference_file/%s' % file_name, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2020-03-13 19:03:57 +00:00
|
|
|
self.assert_failure(rv, error_code="invalid_file_type")
|
|
|
|
|
|
|
|
def test_get_reference_file(self):
|
|
|
|
file_name = "irb_document_types.xls"
|
2021-07-06 17:10:20 +00:00
|
|
|
filepath = os.path.join(app.root_path, 'static', 'reference', 'irb_documents.xlsx')
|
|
|
|
with open(filepath, 'rb') as myfile:
|
|
|
|
file_data = myfile.read()
|
|
|
|
data = {'file': (io.BytesIO(file_data), file_name)}
|
2020-03-13 19:03:57 +00:00
|
|
|
rv = self.app.put('/v1.0/reference_file/%s' % file_name, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
rv = self.app.get('/v1.0/reference_file/%s' % file_name, headers=self.logged_in_headers())
|
2020-03-13 19:03:57 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
data_out = rv.get_data()
|
2021-07-06 17:10:20 +00:00
|
|
|
self.assertEqual(file_data, data_out)
|
2020-03-13 19:03:57 +00:00
|
|
|
|
|
|
|
def test_list_reference_files(self):
|
2020-05-07 17:57:24 +00:00
|
|
|
ExampleDataLoader.clean_db()
|
|
|
|
|
2021-07-06 17:10:20 +00:00
|
|
|
file_name = DocumentService.DOCUMENT_LIST
|
|
|
|
filepath = os.path.join(app.root_path, 'static', 'reference', 'irb_documents.xlsx')
|
|
|
|
with open(filepath, 'rb') as myfile:
|
|
|
|
file_data = myfile.read()
|
|
|
|
data = {'file': (io.BytesIO(file_data), file_name)}
|
2020-03-13 19:03:57 +00:00
|
|
|
rv = self.app.put('/v1.0/reference_file/%s' % file_name, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2021-07-06 17:10:20 +00:00
|
|
|
self.assert_success(rv)
|
2020-03-13 19:03:57 +00:00
|
|
|
rv = self.app.get('/v1.0/reference_file',
|
|
|
|
follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type="application/json", headers=self.logged_in_headers())
|
2020-03-13 19:03:57 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEqual(1, len(json_data))
|
|
|
|
file = FileModelSchema(many=True).load(json_data, session=session)
|
|
|
|
self.assertEqual(file_name, file[0].name)
|
|
|
|
self.assertTrue(file[0].is_reference)
|
|
|
|
|
2020-01-31 16:33:43 +00:00
|
|
|
def test_update_file_info(self):
|
|
|
|
self.load_example_data()
|
2021-07-06 17:10:20 +00:00
|
|
|
self.create_reference_document()
|
|
|
|
file: FileModel = session.query(FileModel).filter(FileModel.is_reference==False).first()
|
2020-01-31 16:33:43 +00:00
|
|
|
file.name = "silly_new_name.bpmn"
|
|
|
|
|
|
|
|
rv = self.app.put('/v1.0/file/%i' % file.id,
|
|
|
|
content_type="application/json",
|
2020-03-24 18:15:21 +00:00
|
|
|
data=json.dumps(FileModelSchema().dump(file)), headers=self.logged_in_headers())
|
2020-01-31 16:33:43 +00:00
|
|
|
self.assert_success(rv)
|
2020-02-04 20:44:06 +00:00
|
|
|
db_file = session.query(FileModel).filter_by(id=file.id).first()
|
2020-01-31 16:33:43 +00:00
|
|
|
self.assertIsNotNone(db_file)
|
|
|
|
self.assertEqual(file.name, db_file.name)
|
|
|
|
|
|
|
|
def test_update_file_data(self):
|
2019-12-27 18:50:03 +00:00
|
|
|
self.load_example_data()
|
2020-01-14 16:45:12 +00:00
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
2019-12-27 18:50:03 +00:00
|
|
|
data = {}
|
2020-04-17 17:30:32 +00:00
|
|
|
data['file'] = io.BytesIO(self.minimal_bpmn("abcdef")), 'my_new_file.bpmn'
|
2020-02-04 20:44:06 +00:00
|
|
|
rv = self.app.post('/v1.0/file?workflow_spec_id=%s' % spec.id, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2021-04-30 15:55:12 +00:00
|
|
|
file_json = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEquals(80, file_json['size'])
|
2019-12-27 18:50:03 +00:00
|
|
|
|
2020-04-17 17:30:32 +00:00
|
|
|
data['file'] = io.BytesIO(self.minimal_bpmn("efghijk")), 'my_new_file.bpmn'
|
2021-04-30 15:55:12 +00:00
|
|
|
rv = self.app.put('/v1.0/file/%i/data' % file_json['id'], data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
self.assertIsNotNone(rv.get_data())
|
2020-05-29 00:03:50 +00:00
|
|
|
file_json = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEqual(2, file_json['latest_version'])
|
|
|
|
self.assertEqual(FileType.bpmn.value, file_json['type'])
|
|
|
|
self.assertEqual("application/octet-stream", file_json['content_type'])
|
2021-04-30 15:55:12 +00:00
|
|
|
self.assertEqual(spec.id, file_json['workflow_spec_id'])
|
2019-12-27 18:50:03 +00:00
|
|
|
|
2020-03-26 16:51:53 +00:00
|
|
|
# Assure it is updated in the database and properly persisted.
|
2021-04-30 15:55:12 +00:00
|
|
|
file_model = session.query(FileModel).filter(FileModel.id == file_json['id']).first()
|
2020-05-29 00:03:50 +00:00
|
|
|
file_data = FileService.get_file_data(file_model.id)
|
|
|
|
self.assertEqual(2, file_data.version)
|
2020-03-26 16:51:53 +00:00
|
|
|
|
2021-04-30 15:55:12 +00:00
|
|
|
rv = self.app.get('/v1.0/file/%i/data' % file_json['id'], headers=self.logged_in_headers())
|
2020-03-04 18:40:25 +00:00
|
|
|
self.assert_success(rv)
|
|
|
|
data = rv.get_data()
|
|
|
|
self.assertIsNotNone(data)
|
2020-04-17 17:30:32 +00:00
|
|
|
self.assertEqual(self.minimal_bpmn("efghijk"), data)
|
2020-03-04 18:40:25 +00:00
|
|
|
|
|
|
|
def test_update_with_same_exact_data_does_not_increment_version(self):
|
|
|
|
self.load_example_data()
|
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
|
|
|
data = {}
|
2020-04-17 17:30:32 +00:00
|
|
|
data['file'] = io.BytesIO(self.minimal_bpmn("abcdef")), 'my_new_file.bpmn'
|
2020-03-04 18:40:25 +00:00
|
|
|
rv = self.app.post('/v1.0/file?workflow_spec_id=%s' % spec.id, data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2020-03-04 18:40:25 +00:00
|
|
|
self.assertIsNotNone(rv.get_data())
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
2020-05-29 00:03:50 +00:00
|
|
|
self.assertEqual(1, json_data['latest_version'])
|
2020-04-17 17:30:32 +00:00
|
|
|
data['file'] = io.BytesIO(self.minimal_bpmn("abcdef")), 'my_new_file.bpmn'
|
2020-05-29 00:03:50 +00:00
|
|
|
rv = self.app.put('/v1.0/file/%i/data' % json_data['id'], data=data, follow_redirects=True,
|
2020-03-24 18:15:21 +00:00
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
2020-03-04 18:40:25 +00:00
|
|
|
self.assertIsNotNone(rv.get_data())
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
2020-05-29 00:03:50 +00:00
|
|
|
self.assertEqual(1, json_data['latest_version'])
|
2020-03-04 18:40:25 +00:00
|
|
|
|
2019-12-27 18:50:03 +00:00
|
|
|
def test_get_file(self):
|
|
|
|
self.load_example_data()
|
2020-01-14 16:45:12 +00:00
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
|
|
|
file = session.query(FileModel).filter_by(workflow_spec_id=spec.id).first()
|
2020-03-24 18:15:21 +00:00
|
|
|
rv = self.app.get('/v1.0/file/%i/data' % file.id, headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assert_success(rv)
|
2020-01-23 20:35:51 +00:00
|
|
|
self.assertEqual("text/xml; charset=utf-8", rv.content_type)
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assertTrue(rv.content_length > 1)
|
|
|
|
|
2021-06-08 12:03:14 +00:00
|
|
|
def test_get_file_contains_data_store_elements(self):
|
|
|
|
self.load_example_data()
|
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
|
|
|
file = session.query(FileModel).filter_by(workflow_spec_id=spec.id).first()
|
|
|
|
ds = DataStoreModel(key="my_key", value="my_value", file_id=file.id);
|
|
|
|
db.session.add(ds)
|
|
|
|
rv = self.app.get('/v1.0/file/%i' % file.id, headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEqual("my_value", json_data['data_store']['my_key'])
|
|
|
|
|
2021-04-23 20:14:23 +00:00
|
|
|
def test_get_files_for_form_field_returns_only_those_files(self):
|
|
|
|
self.create_reference_document()
|
|
|
|
workflow = self.create_workflow('file_upload_form')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
|
|
|
processor.do_engine_steps()
|
|
|
|
task = processor.next_task()
|
|
|
|
correct_name = task.task_spec.form.fields[0].id
|
|
|
|
|
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_id=%i&form_field_key=%s' %
|
|
|
|
(workflow.study_id, workflow.id, task.id, correct_name), data=data, follow_redirects=True,
|
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
|
|
|
|
# Note: this call can be made WITHOUT the task id.
|
|
|
|
rv = self.app.get('/v1.0/file?study_id=%i&workflow_id=%s&form_field_key=%s' %
|
|
|
|
(workflow.study_id, workflow.id, correct_name), follow_redirects=True,
|
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEqual(len(json_data), 1)
|
|
|
|
|
|
|
|
# Add another file for a different document type
|
|
|
|
FileService().add_workflow_file(workflow.id, 'Study_App_Doc', 'otherdoc.docx',
|
|
|
|
'application/xcode', b"asdfasdf")
|
|
|
|
|
|
|
|
# Note: this call can be made WITHOUT the task id.
|
|
|
|
rv = self.app.get('/v1.0/file?study_id=%i&workflow_id=%s&form_field_key=%s' %
|
|
|
|
(workflow.study_id, workflow.id, correct_name), follow_redirects=True,
|
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEqual(len(json_data), 1)
|
|
|
|
|
2021-06-22 18:58:52 +00:00
|
|
|
def test_add_file_returns_document_metadata(self):
|
|
|
|
self.create_reference_document()
|
|
|
|
workflow = self.create_workflow('file_upload_form_single')
|
|
|
|
processor = WorkflowProcessor(workflow)
|
|
|
|
processor.do_engine_steps()
|
|
|
|
task = processor.next_task()
|
|
|
|
correct_name = task.task_spec.form.fields[0].id
|
|
|
|
|
|
|
|
data = {'file': (io.BytesIO(b"abcdef"), 'random_fact.svg')}
|
|
|
|
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_id=%i&form_field_key=%s' %
|
|
|
|
(workflow.study_id, workflow.id, task.id, correct_name), data=data, follow_redirects=True,
|
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertEqual('Ancillary Document', json_data['document']['category1'])
|
|
|
|
self.assertEqual('CRC', json_data['document']['Who Uploads?'])
|
2021-04-23 20:14:23 +00:00
|
|
|
|
2019-12-27 18:50:03 +00:00
|
|
|
def test_delete_file(self):
|
|
|
|
self.load_example_data()
|
2020-01-14 16:45:12 +00:00
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
|
|
|
file = session.query(FileModel).filter_by(workflow_spec_id=spec.id).first()
|
2020-12-28 22:33:38 +00:00
|
|
|
file_id = file.id
|
2020-03-24 18:15:21 +00:00
|
|
|
rv = self.app.get('/v1.0/file/%i' % file.id, headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assert_success(rv)
|
2020-03-24 18:15:21 +00:00
|
|
|
rv = self.app.delete('/v1.0/file/%i' % file.id, headers=self.logged_in_headers())
|
2020-12-28 22:33:38 +00:00
|
|
|
db.session.flush()
|
|
|
|
rv = self.app.get('/v1.0/file/%i' % file_id, headers=self.logged_in_headers())
|
2019-12-27 18:50:03 +00:00
|
|
|
self.assertEqual(404, rv.status_code)
|
2020-03-26 16:51:53 +00:00
|
|
|
|
2020-06-03 21:34:27 +00:00
|
|
|
|
|
|
|
|
2020-04-17 17:30:32 +00:00
|
|
|
def test_change_primary_bpmn(self):
|
|
|
|
self.load_example_data()
|
|
|
|
spec = session.query(WorkflowSpecModel).first()
|
|
|
|
data = {}
|
|
|
|
data['file'] = io.BytesIO(self.minimal_bpmn("abcdef")), 'my_new_file.bpmn'
|
|
|
|
|
|
|
|
# Add a new BPMN file to the specification
|
|
|
|
rv = self.app.post('/v1.0/file?workflow_spec_id=%s' % spec.id, data=data, follow_redirects=True,
|
|
|
|
content_type='multipart/form-data', headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
self.assertIsNotNone(rv.get_data())
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
file = FileModelSchema().load(json_data, session=session)
|
|
|
|
|
|
|
|
# Delete the primary BPMN file for the workflow.
|
|
|
|
orig_model = session.query(FileModel).\
|
|
|
|
filter(FileModel.primary == True).\
|
|
|
|
filter(FileModel.workflow_spec_id == spec.id).first()
|
|
|
|
rv = self.app.delete('/v1.0/file?file_id=%s' % orig_model.id, headers=self.logged_in_headers())
|
|
|
|
|
|
|
|
|
|
|
|
# Set that new file to be the primary BPMN, assure it has a primary_process_id
|
|
|
|
file.primary = True
|
|
|
|
rv = self.app.put('/v1.0/file/%i' % file.id,
|
|
|
|
content_type="application/json",
|
|
|
|
data=json.dumps(FileModelSchema().dump(file)), headers=self.logged_in_headers())
|
|
|
|
self.assert_success(rv)
|
|
|
|
json_data = json.loads(rv.get_data(as_text=True))
|
|
|
|
self.assertTrue(json_data['primary'])
|
|
|
|
self.assertIsNotNone(json_data['primary_process_id'])
|
|
|
|
|