Refactor the workflow_spec_service so it doesn't cache anything, it always reads what it needs from the file system.
This commit is contained in:
parent
cf71f68ad9
commit
f12c4aba52
|
@ -151,7 +151,8 @@ def delete_workflow_specification(spec_id):
|
|||
if spec is None:
|
||||
raise ApiError('unknown_spec', 'The Workflow Specification "' + spec_id + '" is not recognized.')
|
||||
spec_service.delete_spec(spec_id)
|
||||
spec_service.cleanup_workflow_spec_display_order(spec.category_id)
|
||||
category = spec_service.get_category(spec.category_id) # Reload the category, or cleanup may re-create the spec.
|
||||
spec_service.cleanup_workflow_spec_display_order(category)
|
||||
|
||||
|
||||
def reorder_workflow_specification(spec_id, direction):
|
||||
|
|
|
@ -16,6 +16,12 @@ class WorkflowSpecCategory(object):
|
|||
self.workflows = [] # For storing Workflow Metadata
|
||||
self.specs = [] # For the list of specifications associated with a category
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, WorkflowSpecCategory):
|
||||
return False
|
||||
if other.id == self.id:
|
||||
return True
|
||||
return False
|
||||
|
||||
class WorkflowSpecCategorySchema(ma.Schema):
|
||||
class Meta:
|
||||
|
@ -44,6 +50,14 @@ class WorkflowSpecInfo(object):
|
|||
self.libraries = libraries
|
||||
self.category_id = category_id
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, WorkflowSpecInfo):
|
||||
return False
|
||||
if other.id == self.id:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class WorkflowSpecInfoSchema(ma.Schema):
|
||||
class Meta:
|
||||
model = WorkflowSpecInfo
|
||||
|
|
|
@ -62,7 +62,6 @@ Takes two arguments:
|
|||
file_data = None
|
||||
if workflow is not None:
|
||||
workflow_spec_service = WorkflowSpecService()
|
||||
workflow_spec_service.scan_file_system()
|
||||
spec = workflow_spec_service.get_spec(workflow.workflow_spec_id)
|
||||
file_data = SpecFileService().get_data(spec, file_name)
|
||||
|
||||
|
|
|
@ -50,7 +50,7 @@ class FileSystemService(object):
|
|||
return os.path.join(FileSystemService.root_path(), FileSystemService.MASTER_SPECIFICATION)
|
||||
else:
|
||||
category_path = FileSystemService.category_path_for_spec(spec)
|
||||
return os.path.join(category_path, spec.display_name)
|
||||
return os.path.join(category_path, spec.id)
|
||||
|
||||
def next_display_order(self, spec):
|
||||
path = self.category_path_for_spec(spec)
|
||||
|
|
|
@ -107,7 +107,6 @@ class WorkflowProcessor(object):
|
|||
self.workflow_spec_service = WorkflowSpecService()
|
||||
spec = None
|
||||
if workflow_model.bpmn_workflow_json is None:
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
spec_info = self.workflow_spec_service.get_spec(workflow_model.workflow_spec_id)
|
||||
if spec_info is None:
|
||||
raise (ApiError("missing_spec", "The spec this workflow references does not currently exist."))
|
||||
|
|
|
@ -570,7 +570,6 @@ class WorkflowService(object):
|
|||
navigation = processor.bpmn_workflow.get_deep_nav_list()
|
||||
WorkflowService.update_navigation(navigation, processor)
|
||||
spec_service = WorkflowSpecService()
|
||||
spec_service.scan_file_system()
|
||||
spec = spec_service.get_spec(processor.workflow_spec_id)
|
||||
workflow_api = WorkflowApi(
|
||||
id=processor.get_workflow_id(),
|
||||
|
@ -768,7 +767,6 @@ class WorkflowService(object):
|
|||
workflow = db.session.query(WorkflowModel). \
|
||||
filter(WorkflowModel.id == spiff_task.workflow.data['workflow_id']).first()
|
||||
spec_service = WorkflowSpecService()
|
||||
spec_service.scan_file_system()
|
||||
data = SpecFileService.get_data(spec_service.get_spec(workflow.workflow_spec_id), doc_file_name)
|
||||
raw_doc = data.decode("utf-8")
|
||||
except ApiError:
|
||||
|
|
|
@ -3,70 +3,80 @@ import os
|
|||
import shutil
|
||||
from typing import List
|
||||
|
||||
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException
|
||||
from lxml import etree
|
||||
|
||||
from crc.api.common import ApiError
|
||||
from crc.models.file import FileType
|
||||
from crc.models.workflow import WorkflowSpecCategory, WorkflowSpecCategorySchema, WorkflowSpecInfo, \
|
||||
WorkflowSpecInfoSchema
|
||||
from crc.services.file_system_service import FileSystemService
|
||||
|
||||
|
||||
class WorkflowSpecService(FileSystemService):
|
||||
|
||||
"""This is a way of persisting json files to the file system in a way that mimics the data
|
||||
as it would have been stored in the database. This is specific to Workflow Specifications, and
|
||||
Workflow Specification categories.
|
||||
We do this, so we can easily drop in a new configuration on the file system, and change all
|
||||
the workflow specs at once, or manage those file in a git repository. """
|
||||
|
||||
CAT_SCHEMA = WorkflowSpecCategorySchema()
|
||||
WF_SCHEMA = WorkflowSpecInfoSchema()
|
||||
|
||||
"""We store details about the specifications and categories on the file system.
|
||||
This service handles changes and persistence of workflow specs and category specs.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.categories = {}
|
||||
self.specs = {}
|
||||
self.master_spec = None
|
||||
self.libraries = {}
|
||||
self.standalone = {}
|
||||
self.scan_file_system()
|
||||
|
||||
def add_spec(self, spec: WorkflowSpecInfo):
|
||||
display_order = self.next_display_order(spec)
|
||||
spec.display_order = display_order
|
||||
self.update_spec(spec)
|
||||
|
||||
def update_spec(self, spec:WorkflowSpecInfo, rescan=True):
|
||||
def update_spec(self, spec:WorkflowSpecInfo):
|
||||
spec_path = self.workflow_path(spec)
|
||||
if(spec.is_master_spec or spec.library or spec.standalone):
|
||||
if spec.is_master_spec or spec.library or spec.standalone:
|
||||
spec.category_id = ""
|
||||
os.makedirs(spec_path, exist_ok=True)
|
||||
json_path = os.path.join(spec_path, self.WF_JSON_FILE)
|
||||
with open(json_path, "w") as wf_json:
|
||||
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
|
||||
if rescan:
|
||||
self.scan_file_system()
|
||||
|
||||
def delete_spec(self, spec_id: str):
|
||||
if spec_id in self.specs:
|
||||
spec = self.specs[spec_id]
|
||||
if spec.library:
|
||||
self.remove_library_references(spec.id)
|
||||
path = self.workflow_path(spec)
|
||||
shutil.rmtree(path)
|
||||
self.scan_file_system()
|
||||
spec = self.get_spec(spec_id)
|
||||
if not spec:
|
||||
return
|
||||
if spec.library:
|
||||
self.__remove_library_references(spec.id)
|
||||
path = self.workflow_path(spec)
|
||||
shutil.rmtree(path)
|
||||
|
||||
def remove_library_references(self, spec_id):
|
||||
def __remove_library_references(self, spec_id):
|
||||
for spec in self.get_specs():
|
||||
if spec_id in spec.libraries:
|
||||
spec.libraries.remove(spec_id)
|
||||
self.update_spec(spec, rescan=False)
|
||||
self.scan_file_system()
|
||||
self.update_spec(spec)
|
||||
|
||||
def get_spec(self, spec_id: str):
|
||||
if spec_id not in self.specs:
|
||||
return None
|
||||
return self.specs[spec_id]
|
||||
@property
|
||||
def master_spec(self):
|
||||
return self.get_master_spec()
|
||||
|
||||
def get_master_spec(self):
|
||||
path = os.path.join(FileSystemService.root_path(), FileSystemService.MASTER_SPECIFICATION)
|
||||
return self.__scan_spec(path, FileSystemService.MASTER_SPECIFICATION)
|
||||
|
||||
def get_spec(self, spec_id):
|
||||
if not os.path.exists(FileSystemService.root_path()):
|
||||
return # Nothing to scan yet. There are no files.
|
||||
with os.scandir(FileSystemService.root_path()) as category_dirs:
|
||||
for item in category_dirs:
|
||||
category_dir = item
|
||||
if item.is_dir():
|
||||
with os.scandir(item.path) as spec_dirs:
|
||||
for sd in spec_dirs:
|
||||
if sd.name == spec_id:
|
||||
# Now we have the category direcotry, and spec directory
|
||||
category = self.__scan_category(category_dir)
|
||||
return self.__scan_spec(sd.path, sd.name, category)
|
||||
|
||||
def get_specs(self):
|
||||
return list(self.specs.values())
|
||||
categories = self.get_categories()
|
||||
specs = []
|
||||
for cat in categories:
|
||||
specs.extend(cat.specs)
|
||||
return specs
|
||||
|
||||
def reorder_spec(self, spec:WorkflowSpecInfo, direction):
|
||||
specs = spec.category.specs
|
||||
|
@ -76,11 +86,10 @@ class WorkflowSpecService(FileSystemService):
|
|||
specs[index-1], specs[index] = specs[index], specs[index-1]
|
||||
if direction == 'down' and index < len(specs)-1:
|
||||
specs[index+1], specs[index] = specs[index], specs[index+1]
|
||||
return self.cleanup_workflow_spec_display_order(spec.category_id)
|
||||
return self.cleanup_workflow_spec_display_order(spec.category)
|
||||
|
||||
def cleanup_workflow_spec_display_order(self, category_id):
|
||||
def cleanup_workflow_spec_display_order(self, category):
|
||||
index = 0
|
||||
category = self.get_category(category_id)
|
||||
if not category:
|
||||
return []
|
||||
for workflow in category.specs:
|
||||
|
@ -89,52 +98,48 @@ class WorkflowSpecService(FileSystemService):
|
|||
index += 1
|
||||
return category.specs
|
||||
|
||||
def get_libraries(self) -> List[WorkflowSpecInfo]:
|
||||
spec_list = self.libraries.specs
|
||||
spec_list.sort(key=lambda w: w.display_order)
|
||||
return spec_list
|
||||
|
||||
def get_standalones(self) -> List[WorkflowSpecInfo]:
|
||||
spec_list = self.standalone.specs
|
||||
spec_list.sort(key=lambda w: w.display_order)
|
||||
return spec_list
|
||||
|
||||
def get_categories(self) -> List[WorkflowSpecCategory]:
|
||||
"""Returns the categories as a list in display order"""
|
||||
cat_list = list(self.categories.values())
|
||||
cat_list = self.__scan_categories()
|
||||
cat_list.sort(key=lambda w: w.display_order)
|
||||
return cat_list
|
||||
|
||||
def get_category(self, category_id) -> WorkflowSpecCategory:
|
||||
if category_id not in self.categories:
|
||||
return None
|
||||
return self.categories[category_id]
|
||||
def get_libraries(self) -> List[WorkflowSpecInfo]:
|
||||
return self.get_category(self.LIBRARY_SPECS).specs
|
||||
|
||||
def get_standalones(self) -> List[WorkflowSpecInfo]:
|
||||
return self.get_category(self.STAND_ALONE_SPECS).specs
|
||||
|
||||
def get_category(self, category_id):
|
||||
"""Look for a given category, and return it."""
|
||||
if not os.path.exists(FileSystemService.root_path()):
|
||||
return # Nothing to scan yet. There are no files.
|
||||
with os.scandir(FileSystemService.root_path()) as directory_items:
|
||||
for item in directory_items:
|
||||
if item.is_dir() and item.name == category_id:
|
||||
return self.__scan_category(item)
|
||||
|
||||
def add_category(self, category: WorkflowSpecCategory):
|
||||
display_order = len(self.get_categories())
|
||||
category.display_order = display_order
|
||||
return self.update_category(category)
|
||||
|
||||
def update_category(self, category: WorkflowSpecCategory, rescan=True):
|
||||
def update_category(self, category: WorkflowSpecCategory):
|
||||
cat_path = self.category_path(category.id)
|
||||
os.makedirs(cat_path, exist_ok=True)
|
||||
json_path = os.path.join(cat_path, self.CAT_JSON_FILE)
|
||||
with open(json_path, "w") as cat_json:
|
||||
json.dump(self.CAT_SCHEMA.dump(category), cat_json, indent=4)
|
||||
if rescan:
|
||||
self.scan_file_system()
|
||||
return self.categories[category.id]
|
||||
return category
|
||||
|
||||
def delete_category(self, category_id: str):
|
||||
if category_id in self.categories:
|
||||
path = self.category_path(category_id)
|
||||
path = self.category_path(category_id)
|
||||
if os.path.exists(path):
|
||||
shutil.rmtree(path)
|
||||
self.scan_file_system()
|
||||
self.cleanup_category_display_order()
|
||||
self.scan_file_system()
|
||||
|
||||
def reorder_workflow_spec_category(self, cat: WorkflowSpecCategory, direction):
|
||||
cats = self.get_categories() # Returns an ordered list
|
||||
cats = self.get_categories() # Returns an ordered list
|
||||
index = cats.index(cat)
|
||||
if direction == 'up' and index > 0:
|
||||
cats[index-1], cats[index] = cats[index], cats[index-1]
|
||||
|
@ -143,47 +148,39 @@ class WorkflowSpecService(FileSystemService):
|
|||
index = 0
|
||||
for category in cats:
|
||||
category.display_order = index
|
||||
self.update_category(category, rescan=False)
|
||||
self.update_category(category)
|
||||
index += 1
|
||||
return cats
|
||||
|
||||
def cleanup_category_display_order(self):
|
||||
cats = self.get_categories() # Returns an ordered list
|
||||
cats = self.get_categories() # Returns an ordered list
|
||||
index = 0
|
||||
for category in cats:
|
||||
category.display_order = index
|
||||
self.update_category(category, rescan=False)
|
||||
self.update_category(category)
|
||||
index += 1
|
||||
return cats
|
||||
|
||||
def scan_file_system(self):
|
||||
"""Build a model of our workflows, based on the file system structure and json files"""
|
||||
|
||||
# Clear out existing values
|
||||
self.categories = {}
|
||||
self.specs = {}
|
||||
self.master_spec = None
|
||||
self.libraries = {}
|
||||
self.standalone = {}
|
||||
|
||||
def __scan_categories(self):
|
||||
if not os.path.exists(FileSystemService.root_path()):
|
||||
return # Nothing to scan yet. There are no files.
|
||||
return [] # Nothing to scan yet. There are no files.
|
||||
|
||||
directory_items = os.scandir(FileSystemService.root_path())
|
||||
for item in directory_items:
|
||||
if item.is_dir():
|
||||
if item.name == self.REFERENCE_FILES:
|
||||
continue
|
||||
elif item.name == self.LIBRARY_SPECS:
|
||||
self.scan_category(item, is_library=True)
|
||||
elif item.name == self.STAND_ALONE_SPECS:
|
||||
self.scan_category(item, is_standalone=True)
|
||||
elif item.name == self.MASTER_SPECIFICATION:
|
||||
self.scan_spec(item, is_master=True)
|
||||
else:
|
||||
self.scan_category(item)
|
||||
with os.scandir(FileSystemService.root_path()) as directory_items:
|
||||
categories = []
|
||||
for item in directory_items:
|
||||
if item.is_dir():
|
||||
if item.name == self.REFERENCE_FILES:
|
||||
continue
|
||||
elif item.name == self.MASTER_SPECIFICATION:
|
||||
continue
|
||||
elif item.name == self.LIBRARY_SPECS:
|
||||
continue
|
||||
elif item.name == self.STAND_ALONE_SPECS:
|
||||
continue
|
||||
categories.append(self.__scan_category(item))
|
||||
return categories
|
||||
|
||||
def scan_category(self, dir_item: os.DirEntry, is_library=False, is_standalone=False):
|
||||
def __scan_category(self, dir_item: os.DirEntry):
|
||||
"""Reads the category.json file, and any workflow directories """
|
||||
cat_path = os.path.join(dir_item.path, self.CAT_JSON_FILE)
|
||||
if os.path.exists(cat_path):
|
||||
|
@ -194,17 +191,12 @@ class WorkflowSpecService(FileSystemService):
|
|||
cat = WorkflowSpecCategory(id=dir_item.name, display_name=dir_item.name, display_order=10000, admin=False)
|
||||
with open(cat_path, "w") as wf_json:
|
||||
json.dump(self.CAT_SCHEMA.dump(cat), wf_json, indent=4)
|
||||
if is_library:
|
||||
self.libraries = cat
|
||||
elif is_standalone:
|
||||
self.standalone = cat
|
||||
else:
|
||||
self.categories[cat.id] = cat
|
||||
workflow_dirs = os.scandir(dir_item.path)
|
||||
for item in workflow_dirs:
|
||||
if item.is_dir():
|
||||
self.scan_spec(item, category=cat)
|
||||
cat.specs.sort(key=lambda w: w.display_order)
|
||||
with os.scandir(dir_item.path) as workflow_dirs:
|
||||
cat.specs = []
|
||||
for item in workflow_dirs:
|
||||
if item.is_dir():
|
||||
cat.specs.append(self.__scan_spec(item.path, item.name, category=cat))
|
||||
cat.specs.sort(key=lambda w: w.display_order)
|
||||
return cat
|
||||
|
||||
@staticmethod
|
||||
|
@ -216,26 +208,21 @@ class WorkflowSpecService(FileSystemService):
|
|||
# workflow_metas.append(WorkflowMetadata.from_workflow(workflow))
|
||||
return workflow_metas
|
||||
|
||||
def scan_spec(self, dir_item: os.DirEntry, is_master=False, category=None):
|
||||
if not is_master and not category:
|
||||
raise ApiError("invalid_spec_dir", "Please specify what category this workflow belongs to.")
|
||||
spec_path = os.path.join(dir_item.path, self.WF_JSON_FILE)
|
||||
def __scan_spec(self, path, name, category=None):
|
||||
spec_path = os.path.join(path, self.WF_JSON_FILE)
|
||||
is_master = FileSystemService.MASTER_SPECIFICATION in spec_path
|
||||
|
||||
if os.path.exists(spec_path):
|
||||
with open(spec_path) as wf_json:
|
||||
data = json.load(wf_json)
|
||||
spec = self.WF_SCHEMA.load(data)
|
||||
else:
|
||||
spec = WorkflowSpecInfo(id=dir_item.name, library=False, standalone=False, is_master_spec=is_master,
|
||||
display_name=dir_item.name, description="", primary_process_id="",
|
||||
spec = WorkflowSpecInfo(id=name, library=False, standalone=False, is_master_spec=is_master,
|
||||
display_name=name, description="", primary_process_id="",
|
||||
primary_file_name="", display_order=0, is_review=False,
|
||||
libraries=[])
|
||||
with open(spec_path, "w") as wf_json:
|
||||
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
|
||||
if is_master:
|
||||
self.master_spec = spec
|
||||
elif category:
|
||||
if category:
|
||||
spec.category = category
|
||||
category.specs.append(spec)
|
||||
self.specs[spec.id] = spec
|
||||
|
||||
|
||||
return spec
|
||||
|
|
|
@ -151,6 +151,7 @@ class BaseTest(unittest.TestCase):
|
|||
we built up developing crc, use_rrt_data will do the same for hte rrt project,
|
||||
otherwise it depends on a small setup for running tests."""
|
||||
from example_data import ExampleDataLoader
|
||||
# Fixme: Is this really necissary? We already do it after every single test!
|
||||
ExampleDataLoader.clean_db()
|
||||
|
||||
# # If in production mode, only add the first user.
|
||||
|
@ -307,7 +308,6 @@ class BaseTest(unittest.TestCase):
|
|||
|
||||
def create_workflow(self, dir_name, display_name=None, study=None, category_id=None, as_user="dhf8r"):
|
||||
session.flush()
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
spec = self.workflow_spec_service.get_spec(dir_name)
|
||||
if spec is None:
|
||||
if display_name is None:
|
||||
|
@ -361,7 +361,6 @@ class BaseTest(unittest.TestCase):
|
|||
# workflow_in should be a workflow, not a workflow_api
|
||||
# we were passing in workflow_api in many of our tests, and
|
||||
# this caused problems testing standalone workflows
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
spec = self.workflow_spec_service.get_spec(workflow_in.workflow_spec_id)
|
||||
standalone = getattr(spec, 'standalone', False)
|
||||
prev_completed_task_count = workflow_in.completed_tasks
|
||||
|
|
|
@ -13,8 +13,9 @@ from io import BytesIO
|
|||
class TestDeleteTaskData(BaseTest):
|
||||
|
||||
def test_delete_task_data_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
spec_model = self.load_test_spec('delete_task_data')
|
||||
self.create_reference_document()
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
# Make sure we don't get json returned. This would indicate an error.
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -97,7 +97,6 @@ class TestFileService(BaseTest):
|
|||
binary_data=b'5678')
|
||||
|
||||
def test_add_file_from_form_allows_multiple_files_with_different_names(self):
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('file_upload_form')
|
||||
processor = WorkflowProcessor(workflow)
|
||||
task = processor.next_task()
|
||||
|
|
|
@ -42,7 +42,8 @@ class TestSpecFileService(BaseTest):
|
|||
orig = SpecFileService.get_files(spec_random, "random_fact.bpmn")[0]
|
||||
new = SpecFileService.get_files(spec_dt, "random_fact.bpmn")[0]
|
||||
self.assertEqual(orig.size, new.size)
|
||||
self.assertNotEqual(orig.last_modified, new.last_modified)
|
||||
# This next line happens too fast now, so we can't verify the dates are different
|
||||
#self.assertNotEqual(orig.last_modified, new.last_modified)
|
||||
|
||||
def test_set_primary_bpmn(self):
|
||||
self.load_example_data()
|
||||
|
|
|
@ -6,7 +6,8 @@ from unittest.mock import patch
|
|||
class TestCheckStudy(BaseTest):
|
||||
|
||||
def test_check_study_script_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('check_study_script')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -7,7 +7,8 @@ from crc.services.email_service import EmailService
|
|||
class TestGetEmailData(BaseTest):
|
||||
|
||||
def test_email_data_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('get_email_data')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -8,11 +8,11 @@ from crc import app
|
|||
class TestGetStudyAssociateValidation(BaseTest):
|
||||
@patch('crc.services.protocol_builder.ProtocolBuilderService.get_investigators')
|
||||
def test_get_study_associate_validation(self, mock):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
response = self.protocol_builder_response('investigators.json')
|
||||
mock.return_value = json.loads(response)
|
||||
|
||||
app.config['PB_ENABLED'] = True
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('get_study_associate')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % workflow.workflow_spec_id,
|
||||
headers=self.logged_in_headers())
|
||||
|
|
|
@ -7,7 +7,8 @@ from crc.models.workflow import WorkflowModel
|
|||
class TestGetWorkflowStatus(BaseTest):
|
||||
|
||||
def test_get_workflow_status_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('get_workflow_status')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -6,7 +6,8 @@ from crc.models.study import ProgressStatus
|
|||
class TestSetStudyProgressStatus(BaseTest):
|
||||
|
||||
def test_set_study_progress_status_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('set_study_progress_status')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
# The workflow has an enum option that causes an exception.
|
||||
|
|
|
@ -17,7 +17,8 @@ import types
|
|||
class TestTaskLogging(BaseTest):
|
||||
|
||||
def test_logging_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('logging_task')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
@ -35,7 +36,8 @@ class TestTaskLogging(BaseTest):
|
|||
self.assertEqual('Activity_LogEvent', log_model.task)
|
||||
|
||||
def test_get_logging_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('get_logging')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -112,7 +112,6 @@ class TestStudyApi(BaseTest):
|
|||
self.assertEqual(study["ind_number"], db_study.ind_number)
|
||||
self.assertEqual(study["user_uid"], db_study.user_uid)
|
||||
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
workflow_spec_count = len(self.workflow_spec_service.get_specs())
|
||||
workflow_count = session.query(WorkflowModel).filter(WorkflowModel.study_id == study['id']).count()
|
||||
self.assertEqual(workflow_spec_count, workflow_count)
|
||||
|
|
|
@ -24,7 +24,6 @@ class TestStudyService(BaseTest):
|
|||
cat = WorkflowSpecCategory(id="approvals", display_name="Approvals", display_order=0, admin=False)
|
||||
self.workflow_spec_service.add_category(cat)
|
||||
self.load_test_spec("random_fact", category_id=cat.id)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
user = self.create_user()
|
||||
study = StudyModel(title="My title", status=StudyStatus.in_progress, user_uid=user.uid)
|
||||
db.session.add(study)
|
||||
|
|
|
@ -10,7 +10,8 @@ class TestEmailScript(BaseTest):
|
|||
# This validates scripts.email.do_task_validate_only
|
||||
# It also tests that we don't overwrite the default email_address with random text during validation
|
||||
# Otherwise json would have an error about parsing the email address
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('email_script')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -189,7 +189,6 @@ class TestTasksApi(BaseTest):
|
|||
# Modify the specification, with a major change that alters the flow and can't be deserialized
|
||||
# effectively, if it uses the latest spec files.
|
||||
file_path = os.path.join(app.root_path, '..', 'tests', 'data', 'two_forms', 'modified', 'two_forms_struc_mod.bpmn')
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
spec = self.workflow_spec_service.get_spec('two_forms')
|
||||
self.replace_file(spec, "two_forms.bpmn", file_path)
|
||||
|
||||
|
|
|
@ -70,15 +70,12 @@ class TestWorkflowApi(BaseTest):
|
|||
content_type="application/json",
|
||||
headers=self.logged_in_headers())
|
||||
returned=rv.json
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
spec1 = self.workflow_spec_service.get_spec('hello_world')
|
||||
self.assertIn('hello_world_lib', spec1.libraries)
|
||||
|
||||
rv = self.app.delete(f'/v1.0/workflow-specification/%s'%(spec2.id),follow_redirects=True,
|
||||
content_type="application/json",
|
||||
headers=self.logged_in_headers())
|
||||
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
spec1 = self.workflow_spec_service.get_spec('hello_world')
|
||||
self.assertNotIn('hello_world_lib', spec1.libraries)
|
||||
|
||||
|
|
|
@ -6,19 +6,18 @@ import json
|
|||
class TestCustomerError(BaseTest):
|
||||
|
||||
def test_customer_error(self):
|
||||
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('failing_gateway_workflow')
|
||||
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertIn('hint', rv.json[0])
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
self.assertEqual('Add a Condition Type to your gateway path.', json_data[0]['hint'])
|
||||
|
||||
def test_extension_error(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('extension_error')
|
||||
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertIn('hint', rv.json[0])
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
|
|
|
@ -4,6 +4,8 @@ from tests.base_test import BaseTest
|
|||
class TestEnumCheckbox(BaseTest):
|
||||
|
||||
def test_enum_checkbox_validation(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('enum_checkbox')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -39,6 +39,8 @@ class TestWorkflowEnumDefault(BaseTest):
|
|||
self.assertEqual('white', workflow_api.next_task.data['color_select'])
|
||||
|
||||
def test_enum_value_expression_and_default(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('enum_value_expression_fail')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
|
||||
|
|
|
@ -8,7 +8,8 @@ import json
|
|||
class TestEmptyEnumList(BaseTest):
|
||||
|
||||
def test_empty_enum_list(self):
|
||||
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('enum_empty_list')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
|
|
|
@ -5,6 +5,8 @@ from tests.base_test import BaseTest
|
|||
class TestFormFieldName(BaseTest):
|
||||
|
||||
def test_form_field_name(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('workflow_form_field_name')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
|
||||
|
|
|
@ -5,6 +5,8 @@ from tests.base_test import BaseTest
|
|||
class TestFormFieldType(BaseTest):
|
||||
|
||||
def test_form_field_type(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('workflow_form_field_type')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
|
||||
|
|
|
@ -7,6 +7,8 @@ class TestWorkflowHiddenRequiredField(BaseTest):
|
|||
def test_require_default(self):
|
||||
# We have a field that can be hidden and required.
|
||||
# Validation should fail if we don't have a default value.
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('hidden_required_field')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
|
||||
|
@ -17,11 +19,15 @@ class TestWorkflowHiddenRequiredField(BaseTest):
|
|||
self.assertIn('Field "name" is required but can be hidden', json_data[0]['message'])
|
||||
|
||||
def test_require_default_pass(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('hidden_required_field_pass')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual(0, len(rv.json))
|
||||
|
||||
def test_require_default_pass_expression(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('hidden_required_field_pass_expression')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual(0, len(rv.json))
|
||||
|
|
|
@ -11,7 +11,8 @@ class TestWorkflowInfiniteLoop(BaseTest):
|
|||
app.config['PB_ENABLED'] = True
|
||||
mock_get.return_value.ok = True
|
||||
mock_get.return_value.text = self.protocol_builder_response('investigators.json')
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('infinite_loop')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
|
|
|
@ -5,7 +5,8 @@ import json
|
|||
class TestMissingFormKey(BaseTest):
|
||||
|
||||
def test_missing_form_key(self):
|
||||
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('missing_form_key')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
|
|
|
@ -8,6 +8,8 @@ class TestMissingLibrary(BaseTest):
|
|||
def test_missing_library(self):
|
||||
"""This workflow calls a library that does not exist,
|
||||
we validate the workflow, and assert that our error service hint is in the error message."""
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
workflow = self.create_workflow('missing_library')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % workflow.workflow_spec_id, headers=self.logged_in_headers())
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
|
|
|
@ -5,6 +5,8 @@ import json
|
|||
class TestNameErrorHint(BaseTest):
|
||||
|
||||
def test_name_error_hint(self):
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
self.load_example_data()
|
||||
spec_model = self.load_test_spec('script_with_name_error')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
|
|
|
@ -6,7 +6,8 @@ from crc.api.common import ApiError
|
|||
class TestWorkflowReset(BaseTest):
|
||||
|
||||
def test_workflow_reset_validation(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('reset_workflow')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual([], rv.json)
|
||||
|
|
|
@ -28,7 +28,6 @@ class TestWorkflowSpec(BaseTest):
|
|||
self.assertEqual(spec.description, spec2.description)
|
||||
|
||||
def test_add_new_workflow_specification(self):
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
self.assertEqual(0, len(self.workflow_spec_service.get_specs()))
|
||||
self.assertEqual(0, len(self.workflow_spec_service.get_categories()))
|
||||
cat = WorkflowSpecCategory(id="test_cat", display_name="Test Category", display_order=0, admin=False)
|
||||
|
@ -42,7 +41,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
content_type="application/json",
|
||||
data=json.dumps(WorkflowSpecInfoSchema().dump(spec)))
|
||||
self.assert_success(rv)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
fs_spec = self.workflow_spec_service.get_spec('make_cookies')
|
||||
self.assertEqual(spec.display_name, fs_spec.display_name)
|
||||
self.assertEqual(0, fs_spec.display_order)
|
||||
|
@ -55,7 +54,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
api_spec = WorkflowSpecInfoSchema().load(json_data)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
fs_spec = self.workflow_spec_service.get_spec('random_fact')
|
||||
self.assertEqual(WorkflowSpecInfoSchema().dump(fs_spec), json_data)
|
||||
|
||||
|
@ -65,7 +64,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
category_id = 'a_trap'
|
||||
category = WorkflowSpecCategory(id=category_id, display_name="It's a trap!", display_order=0, admin=False)
|
||||
self.workflow_spec_service.add_category(category)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
spec_before: WorkflowSpecInfo = self.workflow_spec_service.get_spec('random_fact')
|
||||
self.assertNotEqual(spec_before.category_id, category_id)
|
||||
|
||||
|
@ -79,7 +78,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
api_spec = WorkflowSpecInfoSchema().load(json_data)
|
||||
self.assertEqual(WorkflowSpecInfoSchema().dump(spec_before), json_data)
|
||||
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
spec_after: WorkflowSpecInfo = self.workflow_spec_service.get_spec('random_fact')
|
||||
self.assertIsNotNone(spec_after.category_id)
|
||||
self.assertIsNotNone(spec_after.category_id, category_id)
|
||||
|
@ -91,7 +90,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
workflow = self.create_workflow(spec_id)
|
||||
workflow_api = self.get_workflow_api(workflow)
|
||||
workflow_path = SpecFileService.workflow_path(spec)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
num_specs_before = len(self.workflow_spec_service.get_specs())
|
||||
self.assertEqual(num_specs_before, 1)
|
||||
num_files_before = len(SpecFileService.get_files(spec))
|
||||
|
@ -99,7 +98,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
self.assertGreater(num_files_before + num_workflows_before, 0)
|
||||
rv = self.app.delete('/v1.0/workflow-specification/' + spec_id, headers=self.logged_in_headers())
|
||||
self.assert_success(rv)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
num_specs_after = len(self.workflow_spec_service.get_specs())
|
||||
self.assertEqual(0, num_specs_after)
|
||||
|
||||
|
@ -109,12 +108,12 @@ class TestWorkflowSpec(BaseTest):
|
|||
self.assertEqual(num_workflows_after, 1)
|
||||
|
||||
def test_display_order_after_delete_spec(self):
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
self.load_test_spec('random_fact')
|
||||
self.load_test_spec('decision_table')
|
||||
self.load_test_spec('email')
|
||||
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
all_specs = self.workflow_spec_service.get_categories()[0].specs
|
||||
for i in range(0, 3):
|
||||
self.assertEqual(i, all_specs[i].display_order)
|
||||
|
@ -123,7 +122,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
|
||||
test_order = 0
|
||||
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
all_specs = self.workflow_spec_service.get_categories()[0].specs
|
||||
for i in range(0, 2):
|
||||
self.assertEqual(i, all_specs[i].display_order)
|
||||
|
@ -131,7 +130,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
def test_get_standalone_workflow_specs(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('random_fact')
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
category = self.workflow_spec_service.get_categories()[0]
|
||||
ExampleDataLoader().create_spec('hello_world', 'Hello World', category_id=category.id,
|
||||
standalone=True, from_tests=True)
|
||||
|
@ -159,7 +158,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
data=json.dumps(WorkflowSpecCategorySchema().dump(category))
|
||||
)
|
||||
self.assert_success(rv)
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
result = WorkflowSpecCategorySchema().loads(rv.get_data(as_text=True))
|
||||
fs_category = self.workflow_spec_service.get_category('test')
|
||||
self.assertEqual('Another Test Category', result.display_name)
|
||||
|
@ -168,7 +167,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
def test_update_workflow_spec_category(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('random_fact')
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
category = self.workflow_spec_service.get_categories()[0]
|
||||
display_name_before = category.display_name
|
||||
new_display_name = display_name_before + '_asdf'
|
||||
|
@ -190,7 +189,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
rv = self.app.delete('/v1.0/workflow-specification-category/Test Category 2', headers=self.logged_in_headers())
|
||||
self.assert_success(rv)
|
||||
test_order = 0
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
categories = self.workflow_spec_service.get_categories()
|
||||
self.assertEqual(2, len(categories))
|
||||
for test_category in categories:
|
||||
|
@ -200,7 +199,7 @@ class TestWorkflowSpec(BaseTest):
|
|||
def test_add_library_with_category_id(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('random_fact')
|
||||
self.workflow_spec_service.scan_file_system()
|
||||
|
||||
category_id = self.workflow_spec_service.get_categories()[0].id
|
||||
spec = WorkflowSpecInfo(id='test_spec', display_name='Test Spec',
|
||||
description='Library with a category id', category_id=category_id,
|
||||
|
|
|
@ -142,16 +142,12 @@ class TestWorkflowSpecReorder(BaseTest):
|
|||
spec_model = specs[0]
|
||||
spec_model.display_order = 1
|
||||
WorkflowSpecService().update_spec(spec_model)
|
||||
# session.add(spec_model)
|
||||
spec_model = specs[1]
|
||||
spec_model.display_order = 1
|
||||
WorkflowSpecService().update_spec(spec_model)
|
||||
# session.add(spec_model)
|
||||
spec_model = specs[2]
|
||||
spec_model.display_order = 1
|
||||
WorkflowSpecService().update_spec(spec_model)
|
||||
# session.add(spec_model)
|
||||
# session.commit()
|
||||
|
||||
bad_specs = WorkflowSpecService().get_specs()
|
||||
bad_specs.sort(key=lambda w: w.display_order)
|
||||
|
@ -167,14 +163,13 @@ class TestWorkflowSpecReorder(BaseTest):
|
|||
rv = self.app.put(f"/v1.0/workflow-specification/test_spec_2/reorder?direction=up",
|
||||
headers=self.logged_in_headers())
|
||||
|
||||
# After moving 2 up, the order should be
|
||||
# test_spec_1, test_spec_2, random_fact, test_spec_3
|
||||
# Make sure we have good display_order numbers too
|
||||
self.assertEqual('test_spec_2', rv.json[0]['id'])
|
||||
self.assertEqual(0, rv.json[0]['display_order'])
|
||||
self.assertEqual('random_fact', rv.json[1]['id'])
|
||||
self.assertEqual(1, rv.json[1]['display_order'])
|
||||
self.assertEqual('test_spec_1', rv.json[2]['id'])
|
||||
self.assertEqual(2, rv.json[2]['display_order'])
|
||||
self.assertEqual('test_spec_3', rv.json[3]['id'])
|
||||
self.assertEqual(3, rv.json[3]['display_order'])
|
||||
# After moving an up, specs should have incremental numbers
|
||||
for i in range(0,3):
|
||||
self.assertEqual(i, rv.json[i]['display_order'])
|
||||
|
||||
second_item_id = rv.json[1]['id']
|
||||
|
||||
#We can now move the second item up one.
|
||||
rv = self.app.put(f"/v1.0/workflow-specification/{second_item_id}/reorder?direction=up",
|
||||
headers=self.logged_in_headers())
|
||||
self.assertEqual(second_item_id, rv.json[0]['id'])
|
||||
|
|
|
@ -30,15 +30,15 @@ class TestWorkflowSync(BaseTest):
|
|||
self.load_test_spec('decision_table', category_id=c2.id)
|
||||
self.load_test_spec('empty_workflow', category_id=c1.id, master_spec=True)
|
||||
self.load_test_spec('email', category_id=c1.id, library=True)
|
||||
# fixme: add a standalone
|
||||
|
||||
def test_from_file_system_blank_slate(self):
|
||||
self.service.scan_file_system()
|
||||
self.assertEquals(0, len(self.service.get_categories()))
|
||||
self.assertEquals(0, len(self.service.get_specs()))
|
||||
|
||||
self.copy_files_to_file_system(self.import_spec_path, self.spec_path)
|
||||
self.service.scan_file_system()
|
||||
self.assertEquals(2, len(self.service.get_categories()))
|
||||
self.assertEquals(5, len(self.service.get_specs()))
|
||||
self.assertEquals(3, len(self.service.get_specs()))
|
||||
self.assertEquals(1, len(self.service.get_category('category_number_one').specs))
|
||||
self.assertEquals(2, len(self.service.get_category('category_number_two').specs))
|
||||
self.assertIsNotNone(self.service.master_spec)
|
||||
|
@ -47,12 +47,10 @@ class TestWorkflowSync(BaseTest):
|
|||
|
||||
def test_delete_category_and_workflows(self):
|
||||
self.copy_files_to_file_system(self.import_spec_path, self.spec_path)
|
||||
self.service.scan_file_system()
|
||||
cat_path = SpecFileService().category_path('Category Number One')
|
||||
cat_path = SpecFileService().category_path('category_number_one')
|
||||
shutil.rmtree(cat_path) # Remove the path, as if from a git pull and the path was removed.
|
||||
self.service.scan_file_system()
|
||||
self.assertEquals(1, len(self.service.get_categories()))
|
||||
self.assertEquals(4, len(self.service.get_specs()))
|
||||
self.assertEquals(2, len(self.service.get_specs()))
|
||||
|
||||
def test_create_file_system(self):
|
||||
self.build_file_system_from_models()
|
||||
|
|
|
@ -24,7 +24,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
|
||||
def test_successful_validation_of_test_workflows(self):
|
||||
app.config['PB_ENABLED'] = False # Assure this is disabled.
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
self.assertEqual(0, len(self.validate_workflow("parallel_tasks")))
|
||||
self.assertEqual(0, len(self.validate_workflow("decision_table")))
|
||||
self.assertEqual(0, len(self.validate_workflow("docx")))
|
||||
|
@ -36,7 +37,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertEqual(0, len(self.validate_workflow("ldap_lookup")))
|
||||
|
||||
def test_invalid_expression(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("invalid_expression")
|
||||
self.assertEqual(1, len(errors))
|
||||
self.assertEqual("workflow_validation_exception", errors[0]['code'])
|
||||
|
@ -49,7 +51,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertIn("has_bananas", errors[0]['task_data'])
|
||||
|
||||
def test_validation_error(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("invalid_spec")
|
||||
self.assertEqual(1, len(errors))
|
||||
self.assertEqual("workflow_validation_error", errors[0]['code'])
|
||||
|
@ -58,7 +61,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
|
||||
|
||||
def test_invalid_script(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("invalid_script")
|
||||
self.assertEqual(1, len(errors))
|
||||
self.assertEqual("workflow_validation_exception", errors[0]['code'])
|
||||
|
@ -68,7 +72,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertEqual("invalid_script.bpmn", errors[0]['file_name'])
|
||||
|
||||
def test_invalid_script2(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("invalid_script2")
|
||||
self.assertEqual(1, len(errors))
|
||||
self.assertEqual("workflow_validation_exception", errors[0]['code'])
|
||||
|
@ -80,7 +85,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertEqual("invalid_script2.bpmn", errors[0]['file_name'])
|
||||
|
||||
def test_invalid_script3(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("invalid_script3")
|
||||
self.assertEqual(1, len(errors))
|
||||
self.assertEqual("Invalid_Script_Task", errors[0]['task_id'])
|
||||
|
@ -88,14 +94,16 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertEqual("NameError", errors[0]['error_type'])
|
||||
|
||||
def test_repeating_sections_correctly_populated(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('repeat_form')
|
||||
final_data = WorkflowService.test_spec(spec_model.id)
|
||||
self.assertIsNotNone(final_data)
|
||||
self.assertIn('cats', final_data)
|
||||
|
||||
def test_required_fields(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('required_fields')
|
||||
final_data = WorkflowService.test_spec(spec_model.id)
|
||||
self.assertIsNotNone(final_data)
|
||||
|
@ -108,7 +116,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertNotIn('string_not_required', final_data)
|
||||
|
||||
def test_enum_defaults_correctly_populated(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('required_fields')
|
||||
final_data = WorkflowService.test_spec(spec_model.id, required_only=True)
|
||||
self.assertIsNotNone(final_data)
|
||||
|
@ -116,7 +125,8 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
self.assertEqual('maybe', final_data['enum_with_default'])
|
||||
|
||||
def test_invalid_custom_field(self):
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("invalid_custom_field")
|
||||
self.assertEqual(1, len(errors))
|
||||
self.assertEqual("invalid_field_type", errors[0]['code'])
|
||||
|
@ -125,8 +135,10 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
def test_disabled_spec_validation(self, mock_status):
|
||||
"""A disabled workflow spec should fail validation"""
|
||||
app.config['PB_ENABLED'] = True
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec = self.load_test_spec('data_security_plan')
|
||||
workflow = self.create_workflow('data_security_plan')
|
||||
study_model = session.query(StudyModel).first()
|
||||
|
||||
# This response sets the status for data_security_plan to disabled
|
||||
|
@ -148,5 +160,7 @@ class TestWorkflowSpecValidation(BaseTest):
|
|||
# it wasn't converted to an ISO String as it would be if submitted through the API.
|
||||
# subsequent attempts to work with the expected date_string failed, because it was already a date.
|
||||
# This can't happen in the front end code base, but it was breaking validation.
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
errors = self.validate_workflow("date_value_expression")
|
||||
self.assertEqual(0, len(errors))
|
||||
|
|
|
@ -36,7 +36,8 @@ class TestValueExpression(BaseTest):
|
|||
def test_validate_task_with_both_default_and_expression(self):
|
||||
# This actually fails validation.
|
||||
# We are testing the error message is correct.
|
||||
self.load_example_data()
|
||||
self.load_test_spec('empty_workflow', master_spec=True)
|
||||
self.create_reference_document()
|
||||
spec_model = self.load_test_spec('default_value_expression')
|
||||
rv = self.app.get('/v1.0/workflow-specification/%s/validate' % spec_model.id, headers=self.logged_in_headers())
|
||||
self.assertEqual('default value and value_expression', rv.json[0]['code'])
|
||||
|
|
Loading…
Reference in New Issue