the tests kind of try and run now ...

This commit is contained in:
Dan 2022-02-07 12:18:32 -05:00
parent ff9b2694ae
commit 1c384a78e9
4 changed files with 64 additions and 91 deletions

View File

@ -14,7 +14,7 @@ class TaskEventModel(db.Model):
study_id = db.Column(db.Integer, db.ForeignKey('study.id'))
user_uid = db.Column(db.String, nullable=False) # In some cases the unique user id may not exist in the db yet.
workflow_id = db.Column(db.Integer, db.ForeignKey('workflow.id'), nullable=False)
workflow_spec_id = db.Column(db.String, db.ForeignKey('workflow_spec.id'))
workflow_spec_id = db.Column(db.String)
spec_version = db.Column(db.String)
action = db.Column(db.String)
task_id = db.Column(db.String)

View File

@ -79,7 +79,7 @@ class WorkflowModel(db.Model):
status = db.Column(db.Enum(WorkflowStatus))
study_id = db.Column(db.Integer, db.ForeignKey('study.id'))
study = db.relationship("StudyModel", backref='workflow')
workflow_spec_id = db.Column(db.String, db.ForeignKey('workflow_spec.id'))
workflow_spec_id = db.Column(db.String)
total_tasks = db.Column(db.Integer, default=0)
completed_tasks = db.Column(db.Integer, default=0)
last_updated = db.Column(db.DateTime(timezone=True), server_default=func.now())

View File

@ -54,6 +54,9 @@ class WorkflowSpecService(FileSystemService):
raise ApiError('unknown spec', 'unable to find a spec with id:' + spec_id)
return self.specs[spec_id]
def get_specs(self):
return list(self.specs.values())
def reorder_spec(self, spec:WorkflowSpecInfo, direction):
workflows = spec.category.workflows
workflows.sort(key=lambda w: w.display_order)
@ -70,23 +73,42 @@ class WorkflowSpecService(FileSystemService):
return workflows
def get_libraries(self) -> List[WorkflowSpecInfo]:
# fixme
pass
spec_list = list(self.libraries.values())
spec_list.sort(key=lambda w: w.display_order)
return spec_list
def get_standalones(self) -> List[WorkflowSpecInfo]:
spec_list = list(self.standalone.values())
spec_list.sort(key=lambda w: w.display_order)
return spec_list
def get_categories(self) -> List[WorkflowSpecCategory]:
pass
"""Returns the categories as a list in display order"""
cat_list = list(self.categories.values())
cat_list.sort(key=lambda w: w.display_order)
return cat_list
def get_category(self, category_id) -> WorkflowSpecCategory:
pass
if category_id not in self.categories:
raise ApiError('unknown category', 'unable to find a category with id:' + category_id)
return self.categories[category_id]
def add_category(self, category: WorkflowSpecCategory):
pass
self.update_category(category)
def update_category(self, category: WorkflowSpecCategory):
pass
cat_path = self.category_path(category.display_name)
os.makedirs(os.path.dirname(cat_path), exist_ok=True)
json_path = os.path.join(cat_path, self.CAT_JSON_FILE)
with open(json_path, "w") as cat_json:
json.dump(self.CAT_SCHEMA.dump(category), cat_json, indent=4)
self.scan_file_system()
def delete_category(self, category: WorkflowSpecCategory):
pass
def delete_category(self, category_id: str):
if category_id in self.specs:
path = self.category_path(category_id)
shutil.rmtree(path)
self.scan_file_system()
def reorder_workflow_spec_category(self, spec:WorkflowSpecInfo, direction):
# Fixme: Resort Workflow categories
@ -101,18 +123,20 @@ class WorkflowSpecService(FileSystemService):
self.master_spec = None
self.libraries = {}
if not os.path.exists(FileSystemService.root_path()):
raise ApiError('missing_specs', 'The path for workflow specifications does not exist.')
return # Nothing to scan yet. There are no files.
directory_items = os.scandir(FileSystemService.root_path())
for item in directory_items:
if item.is_dir():
if item.name == self.LIBRARY_SPECS:
self.scan_category(item, is_library=True)
if item.name == self.STAND_ALONE_SPECS:
self.scan_category(item, is_standalone=True)
elif item.name == self.MASTER_SPECIFICATION:
self.scan_spec(item, is_master=True)
else:
self.scan_category(item)
def scan_category(self, dir_item: os.DirEntry, is_library=False):
def scan_category(self, dir_item: os.DirEntry, is_library=False, is_standalone=False):
"""Reads the category.json file, and any workflow directories """
cat_path = os.path.join(dir_item.path, self.CAT_JSON_FILE)
if os.path.exists(cat_path):
@ -125,6 +149,8 @@ class WorkflowSpecService(FileSystemService):
json.dump(self.CAT_SCHEMA.dump(cat), wf_json, indent=4)
if is_library:
self.libraries = cat
elif is_standalone:
self.standalone = cat
else:
self.categories[cat.id] = cat
workflow_dirs = os.scandir(FileSystemService.root_path())
@ -141,7 +167,6 @@ class WorkflowSpecService(FileSystemService):
# workflow_metas.append(WorkflowMetadata.from_workflow(workflow))
return workflow_metas
def scan_spec(self, dir_item: os.DirEntry, is_master=False, category=None):
if not is_master and not category:
raise ApiError("invalid_spec_dir", "Please specify what category this workflow belongs to.")
@ -153,15 +178,16 @@ class WorkflowSpecService(FileSystemService):
else:
spec = WorkflowSpecInfo(id=dir_item.name, library=False, standalone=False, is_master_spec=is_master,
display_name=dir_item.name, description="", primary_process_id="",
primary_file_name="")
primary_file_name="", category_name="", display_order=0, is_review=False,
libraries=[])
with open(spec_path, "w") as wf_json:
json.dump(WorkflowSpecInfoSchema.dump(spec), wf_json, indent=4)
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
if is_master:
self.master_spec = spec
elif category:
self.specs[spec.id] = spec
spec.category = category
category.workflow_specs.append(spec)
category.workflows.append(spec)
def set_primary_bpmn(self, workflow_spec: WorkflowSpecInfo, file_name: str, binary_data=None):
# If this is a BPMN, extract the process id, and determine if it is contains swim lanes.

View File

@ -4,11 +4,10 @@ import shutil
from tests.base_test import BaseTest
from crc.services.workflow_sync import WorkflowSyncService
from crc.models.workflow import WorkflowSpecInfo, WorkflowSpecCategory
from crc.services.file_system_service import FileSystemService
from crc.services.spec_file_service import SpecFileService
from crc import db, app
from crc import db, app, WorkflowSpecService
class TestWorkflowSync(BaseTest):
@ -16,12 +15,12 @@ class TestWorkflowSync(BaseTest):
spec_path = FileSystemService.root_path()
import_spec_path = os.path.join(app.root_path, '..', 'tests', 'data', 'IMPORT_TEST')
def set_up_file_system(self):
"""Some tests rely on a well populated file system and an empty database to start"""
def copy_files_to_file_system(self):
"""Some tests rely on a well populated file system """
shutil.copytree(self.import_spec_path, self.spec_path)
def set_up_database(self):
"""Some tests rely on a well populated database and an empty file system to start"""
def build_file_system_from_models(self):
"""Some tests check to see what happens when we write data to an empty file system."""
# Construct Two Categories, with one workflow in first category, two in the second.
# assure that the data in categories.json is correct
@ -35,77 +34,25 @@ class TestWorkflowSync(BaseTest):
self.load_test_spec('email', category_id=c1.id, library=True)
def test_from_file_system_blank_slate(self):
self.assertEquals(0, len(db.session.query(WorkflowSpecModel).all()))
self.assertEquals(0, len(db.session.query(WorkflowSpecCategoryModel).all()))
self.set_up_file_system()
WorkflowSyncService().from_file_system()
self.assertEquals(2, len(db.session.query(WorkflowSpecCategoryModel).all()))
self.assertEquals(5, len(db.session.query(WorkflowSpecModel).all()))
self.assertEquals(1, len(db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.category_id == 1).all()))
self.assertEquals(2, len(db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.category_id == 2).all()))
self.assertEquals(1, len(db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.is_master_spec).all()))
self.assertEquals(1, len(db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.library).all()))
# The top level workflow, has a library
tlw = db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.is_master_spec).first()
self.assertEquals(1, len(tlw.libraries))
def test_repeated_imports(self):
self.set_up_file_system()
WorkflowSyncService().from_file_system()
WorkflowSyncService().from_file_system()
WorkflowSyncService().from_file_system()
self.assertEquals(2, len(db.session.query(WorkflowSpecCategoryModel).all()))
self.assertEquals(5, len(db.session.query(WorkflowSpecModel).all()))
service = WorkflowSpecService()
self.assertEquals(0, len(service.get_categories()))
self.assertEquals(0, len(service.get_specs()))
self.copy_files_to_file_system()
self.assertEquals(2, len(service.get_categories()))
self.assertEquals(5, len(service.get_specs()))
self.assertEquals(1, len(service.get_category('Category Number One').workflows))
self.assertEquals(2, len(service.get_category('Category Number Two').workflows))
self.assertIsNotNone(service.master_spec)
self.assertEquals(1, len(service.get_libraries()))
self.assertEquals(1, len(service.master_spec.libraries))
def test_delete_category_and_workflows(self):
self.set_up_file_system()
WorkflowSyncService().from_file_system()
self.copy_files_to_file_system()
service = WorkflowSpecService()
cat_path = SpecFileService().category_path('Category Number One')
shutil.rmtree(cat_path)
WorkflowSyncService().from_file_system()
self.assertEquals(1, len(db.session.query(WorkflowSpecCategoryModel).all()))
self.assertEquals(4, len(db.session.query(WorkflowSpecModel).all()))
self.assertEquals(0, len(db.session.query(WorkflowSpecModel).filter(WorkflowSpecModel.category_id == 1).all()))
WorkflowSyncService().to_file_system()
json_path = os.path.join(FileSystemService.root_path(), "categories.json")
with open(json_path) as json_file:
data = json.load(json_file)
self.assertEquals(1, len(data['categories']), "When the json file is written back to disk, there is only one category now.")
def test_to_file_system(self):
"""Assure we get the basic paths on the file system that we would expect."""
self.assertFalse(os.path.exists(self.spec_path))
self.set_up_database()
self.assertEqual(4, len(os.listdir(self.spec_path)), "Adding workflows should create dir structure")
WorkflowSyncService().to_file_system()
self.assertEqual(5, len(os.listdir(self.spec_path)), "Sync service should create categories.json file")
def test_to_file_system_correct_categories(self):
"""Assure we have two categories in the json file, and that these directories exist, and contain
workflow.json files for each workflow."""
self.set_up_database()
WorkflowSyncService().to_file_system()
json_path = os.path.join(self.spec_path, 'categories.json')
with open(json_path) as json_file:
data = json.load(json_file)
self.assertTrue('categories' in data)
self.assertEqual(2, len(data['categories']))
counter = 0
for c in data['categories']:
cat_path = os.path.join(self.spec_path, c['display_name'])
self.assertTrue(os.path.exists(cat_path), "The category directories exist.")
self.assertEqual(data['categories'][counter]['display_order'], counter, "Order is correct")
counter += 1
workflow_dirs = os.listdir(cat_path)
for wd in workflow_dirs:
wf_json_path = os.path.join(cat_path, wd, 'workflow.json')
self.assertTrue(os.path.exists(wf_json_path), "A workflow.json file should exist.")
# Fixme: Assure the master workflow spec, and Libraries are also exported to file system.
shutil.rmtree(cat_path) # Remove the path, as if from a git pull and the path was removed.
self.assertEquals(2, len(service.get_categories()))
self.assertEquals(4, len(service.get_specs()))
# Todo:
# * What if category json files, and directories don't match?
# * Test renaming a category
# * Test moving a workflow to a different category