cr-connect-workflow/tests/files/test_delete_task_data.py

128 lines
6.4 KiB
Python

from tests.base_test import BaseTest
from crc import session
from crc.models.data_store import DataStoreModel
from crc.models.file import FileModel
from crc.models.task_event import TaskEventModel
from crc.services.workflow_service import WorkflowService
from io import BytesIO
class TestDeleteTaskData(BaseTest):
def test_delete_task_data_validation(self):
self.load_example_data()
spec_model = self.load_test_spec('delete_task_data')
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
# Make sure we don't get json returned. This would indicate an error.
self.assertEqual([], rv.json)
def test_delete_task_data(self):
self.load_example_data()
doc_code_1 = 'Study_Protocol_Document'
doc_code_2 = 'Study_App_Doc'
workflow = self.create_workflow('delete_task_data')
# Make sure there are no files uploaded for workflow yet
files = session.query(FileModel).filter(FileModel.workflow_id == workflow.id).all()
self.assertEqual(0, len(files))
workflow_api = self.get_workflow_api(workflow)
first_task = workflow_api.next_task
# Upload Single File
data = {'file': (BytesIO(b"abcdef"), 'test_file.txt')}
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_spec_name=%s&form_field_key=%s' %
(workflow.study_id, workflow.id, first_task.name, doc_code_1), data=data, follow_redirects=True,
content_type='multipart/form-data', headers=self.logged_in_headers())
self.assert_success(rv)
file_id = rv.json['id']
self.complete_form(workflow, first_task, {doc_code_1: {'id': file_id},
'VerDate': '20210721'})
# Make sure we have 1 file
files = session.query(FileModel).filter(FileModel.workflow_id == workflow.id).all()
self.assertEqual(1, len(files))
# Make sure data store is set
data_store = session.query(DataStoreModel).filter(DataStoreModel.file_id == file_id).all()
self.assertEqual('VerDate', data_store[0].key)
self.assertEqual('20210721', data_store[0].value)
workflow_api = self.get_workflow_api(workflow)
second_task = workflow_api.next_task
# Upload 2 Files
data = {'file': (BytesIO(b"abcdef"), 'test_file_1.txt')}
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_spec_name=%s&form_field_key=%s' %
(workflow.study_id, workflow.id, first_task.name, doc_code_2), data=data, follow_redirects=True,
content_type='multipart/form-data', headers=self.logged_in_headers())
self.assert_success(rv)
file_id_1 = rv.json['id']
data = {'file': (BytesIO(b"ghijk"), 'test_file_2.txt')}
rv = self.app.post('/v1.0/file?study_id=%i&workflow_id=%s&task_spec_name=%s&form_field_key=%s' %
(workflow.study_id, workflow.id, first_task.name, doc_code_2), data=data, follow_redirects=True,
content_type='multipart/form-data', headers=self.logged_in_headers())
self.assert_success(rv)
file_id_2 = rv.json['id']
self.complete_form(workflow, second_task, {'StudyAppDoc': [{'Study_App_Doc': {'id': file_id_1},
'VerDate': '20210701',
'ShortDesc': 'Short Description 1'},
{'Study_App_Doc': {'id': file_id_2},
'VerDate': '20210702',
'ShortDesc': 'Short Description 2'}
]})
# Make sure we have 2 more files
files = session.query(FileModel).filter(FileModel.workflow_id == workflow.id).all()
self.assertEqual(3, len(files))
# Make sure data stores are set for new files
data_stores_1 = session.query(DataStoreModel).filter(DataStoreModel.file_id == file_id_1).all()
for data_store in data_stores_1:
if data_store.key == 'VerDate':
self.assertEqual('20210701', data_store.value)
elif data_store.key == 'ShortDesc':
self.assertEqual('Short Description 1', data_store.value)
data_stores_2 = session.query(DataStoreModel).filter(DataStoreModel.file_id == file_id_2).all()
for data_store in data_stores_2:
if data_store.key == 'VerDate':
self.assertEqual('20210702', data_store.value)
elif data_store.key == 'ShortDesc':
self.assertEqual('Short Description 2', data_store.value)
# Make sure we have something in task_events
task_events = session.query(TaskEventModel).\
filter(TaskEventModel.workflow_id == workflow.id).\
filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE).all()
for task_event in task_events:
self.assertNotEqual({}, task_event.form_data)
workflow_api = self.get_workflow_api(workflow)
third_task = workflow_api.next_task
# This calls the delete task file data script
self.complete_form(workflow, third_task, {})
self.get_workflow_api(workflow)
# Make sure files, data_stores, and task_events are deleted
data_stores = session.query(DataStoreModel).filter(DataStoreModel.file_id == file_id).all()
data_stores_1 = session.query(DataStoreModel).filter(DataStoreModel.file_id == file_id_1).all()
data_stores_2 = session.query(DataStoreModel).filter(DataStoreModel.file_id == file_id_2).all()
files = session.query(FileModel).filter(FileModel.workflow_id == workflow.id).all()
task_events = session.query(TaskEventModel).\
filter(TaskEventModel.workflow_id == workflow.id).\
filter(TaskEventModel.action == WorkflowService.TASK_ACTION_COMPLETE).all()
self.assertEqual(0, len(data_stores))
self.assertEqual(0, len(data_stores_1))
self.assertEqual(0, len(data_stores_2))
self.assertEqual(0, len(files))
for task_event in task_events:
self.assertEqual({}, task_event.form_data)