cr-connect-workflow/crc/services/workflow_spec_service.py

233 lines
9.0 KiB
Python
Raw Normal View History

import json
import os
2022-02-07 09:12:11 -05:00
import shutil
from typing import List
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException
from lxml import etree
from crc.api.common import ApiError
from crc.models.file import FileType
from crc.models.workflow import WorkflowSpecCategory, WorkflowSpecCategorySchema, WorkflowSpecInfo, \
WorkflowSpecInfoSchema
from crc.services.file_system_service import FileSystemService
class WorkflowSpecService(FileSystemService):
CAT_SCHEMA = WorkflowSpecCategorySchema()
WF_SCHEMA = WorkflowSpecInfoSchema()
"""We store details about the specifications and categories on the file system.
This service handles changes and persistence of workflow specs and category specs.
"""
def __init__(self):
self.categories = {}
self.specs = {}
self.master_spec = None
self.libraries = {}
self.standalone = {}
2022-02-07 15:17:32 -05:00
self.scan_file_system()
2022-02-07 09:12:11 -05:00
def add_spec(self, spec: WorkflowSpecInfo):
display_order = self.next_display_order(spec)
2022-02-08 13:35:27 -05:00
spec.display_order = display_order
2022-02-07 09:12:11 -05:00
self.update_spec(spec)
2022-02-08 13:35:27 -05:00
def update_spec(self, spec:WorkflowSpecInfo, rescan=True):
spec_path = self.workflow_path(spec)
2022-02-08 13:35:27 -05:00
if(spec.is_master_spec or spec.library or spec.standalone):
spec.category_id = ""
2022-02-07 14:58:25 -05:00
os.makedirs(spec_path, exist_ok=True)
json_path = os.path.join(spec_path, self.WF_JSON_FILE)
with open(json_path, "w") as wf_json:
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
2022-02-08 13:35:27 -05:00
if rescan:
self.scan_file_system()
2022-02-07 09:12:11 -05:00
def delete_spec(self, spec_id: str):
if spec_id in self.specs:
spec = self.specs[spec_id]
path = self.workflow_path(spec)
shutil.rmtree(path)
self.scan_file_system()
def get_spec(self, spec_id: str):
if spec_id not in self.specs:
2022-02-07 14:58:25 -05:00
return None
2022-02-07 09:12:11 -05:00
return self.specs[spec_id]
2022-02-07 12:18:32 -05:00
def get_specs(self):
return list(self.specs.values())
2022-02-07 09:12:11 -05:00
def reorder_spec(self, spec:WorkflowSpecInfo, direction):
2022-02-09 08:50:00 -05:00
specs = spec.category.specs
specs.sort(key=lambda w: w.display_order)
index = specs.index(spec)
if direction == 'up' and index > 0:
2022-02-09 08:50:00 -05:00
specs[index-1], specs[index] = specs[index], specs[index-1]
if direction == 'down' and index < len(specs)-1:
specs[index+1], specs[index] = specs[index], specs[index+1]
return self.cleanup_workflow_spec_display_order(spec.category_id)
2022-02-08 11:30:13 -05:00
def cleanup_workflow_spec_display_order(self, category_id):
index = 0
2022-02-08 11:30:13 -05:00
category = self.get_category(category_id)
if not category:
return []
2022-02-09 08:50:00 -05:00
for workflow in category.specs:
workflow.display_order = index
2022-02-08 11:30:13 -05:00
self.update_spec(workflow)
index += 1
2022-02-09 08:50:00 -05:00
return category.specs
2022-02-07 09:12:11 -05:00
def get_libraries(self) -> List[WorkflowSpecInfo]:
2022-02-09 13:41:50 -05:00
spec_list = self.libraries.specs
spec_list.sort(key=lambda w: w.display_order)
2022-02-07 12:18:32 -05:00
return spec_list
def get_standalones(self) -> List[WorkflowSpecInfo]:
2022-02-09 13:52:48 -05:00
spec_list = self.standalone.specs
2022-02-09 13:41:50 -05:00
spec_list.sort(key=lambda w: w.display_order)
2022-02-07 12:18:32 -05:00
return spec_list
2022-02-07 09:12:11 -05:00
def get_categories(self) -> List[WorkflowSpecCategory]:
2022-02-07 12:18:32 -05:00
"""Returns the categories as a list in display order"""
cat_list = list(self.categories.values())
cat_list.sort(key=lambda w: w.display_order)
return cat_list
def get_category(self, category_id) -> WorkflowSpecCategory:
2022-02-07 12:18:32 -05:00
if category_id not in self.categories:
2022-02-07 14:58:25 -05:00
return None
2022-02-07 12:18:32 -05:00
return self.categories[category_id]
def add_category(self, category: WorkflowSpecCategory):
2022-02-09 11:24:41 -05:00
display_order = len(self.get_categories())
category.display_order = display_order
2022-02-08 11:30:13 -05:00
return self.update_category(category)
2022-02-07 09:12:11 -05:00
2022-02-08 13:35:27 -05:00
def update_category(self, category: WorkflowSpecCategory, rescan=True):
2022-02-09 08:50:00 -05:00
cat_path = self.category_path(category.id)
2022-02-07 14:58:25 -05:00
os.makedirs(cat_path, exist_ok=True)
2022-02-07 12:18:32 -05:00
json_path = os.path.join(cat_path, self.CAT_JSON_FILE)
with open(json_path, "w") as cat_json:
json.dump(self.CAT_SCHEMA.dump(category), cat_json, indent=4)
2022-02-08 13:35:27 -05:00
if rescan:
self.scan_file_system()
2022-02-08 11:30:13 -05:00
return self.categories[category.id]
2022-02-07 09:12:11 -05:00
2022-02-07 12:18:32 -05:00
def delete_category(self, category_id: str):
if category_id in self.categories:
2022-02-07 12:18:32 -05:00
path = self.category_path(category_id)
shutil.rmtree(path)
self.scan_file_system()
2022-02-08 13:35:27 -05:00
self.cleanup_category_display_order()
self.scan_file_system()
2022-02-08 13:35:27 -05:00
def reorder_workflow_spec_category(self, cat: WorkflowSpecCategory, direction):
cats = self.get_categories() # Returns an ordered list
2022-02-08 17:45:14 -05:00
index = cats.index(cat)
2022-02-08 12:53:20 -05:00
if direction == 'up' and index > 0:
2022-02-08 13:35:27 -05:00
cats[index-1], cats[index] = cats[index], cats[index-1]
2022-02-09 10:55:02 -05:00
if direction == 'down' and index < len(cats)-1:
2022-02-08 13:35:27 -05:00
cats[index+1], cats[index] = cats[index], cats[index+1]
index = 0
for category in cats:
category.display_order = index
self.update_category(category, rescan=False)
index += 1
return cats
def cleanup_category_display_order(self):
cats = self.get_categories() # Returns an ordered list
2022-02-08 12:53:20 -05:00
index = 0
2022-02-08 13:35:27 -05:00
for category in cats:
2022-02-08 12:53:20 -05:00
category.display_order = index
2022-02-08 13:35:27 -05:00
self.update_category(category, rescan=False)
2022-02-08 12:53:20 -05:00
index += 1
2022-02-08 13:35:27 -05:00
return cats
def scan_file_system(self):
"""Build a model of our workflows, based on the file system structure and json files"""
# Clear out existing values
self.categories = {}
self.specs = {}
self.master_spec = None
self.libraries = {}
self.standalone = {}
if not os.path.exists(FileSystemService.root_path()):
2022-02-07 12:18:32 -05:00
return # Nothing to scan yet. There are no files.
directory_items = os.scandir(FileSystemService.root_path())
for item in directory_items:
if item.is_dir():
2022-02-09 08:50:00 -05:00
if item.name == self.REFERENCE_FILES:
continue
elif item.name == self.LIBRARY_SPECS:
self.scan_category(item, is_library=True)
elif item.name == self.STAND_ALONE_SPECS:
2022-02-07 12:18:32 -05:00
self.scan_category(item, is_standalone=True)
elif item.name == self.MASTER_SPECIFICATION:
self.scan_spec(item, is_master=True)
else:
self.scan_category(item)
2022-02-07 12:18:32 -05:00
def scan_category(self, dir_item: os.DirEntry, is_library=False, is_standalone=False):
"""Reads the category.json file, and any workflow directories """
cat_path = os.path.join(dir_item.path, self.CAT_JSON_FILE)
if os.path.exists(cat_path):
with open(cat_path) as cat_json:
data = json.load(cat_json)
cat = self.CAT_SCHEMA.load(data)
else:
cat = WorkflowSpecCategory(id=dir_item.name, display_name=dir_item.name, display_order=10000, admin=False)
with open(cat_path, "w") as wf_json:
json.dump(self.CAT_SCHEMA.dump(cat), wf_json, indent=4)
if is_library:
self.libraries = cat
2022-02-07 12:18:32 -05:00
elif is_standalone:
self.standalone = cat
else:
self.categories[cat.id] = cat
workflow_dirs = os.scandir(dir_item.path)
for item in workflow_dirs:
if item.is_dir():
self.scan_spec(item, category=cat)
2022-02-09 08:50:00 -05:00
cat.specs.sort(key=lambda w: w.display_order)
return cat
@staticmethod
def _get_workflow_metas(study_id):
# Add in the Workflows for each category
# Fixme: moved fro the Study Service
workflow_metas = []
# for workflow in workflow_models:
# workflow_metas.append(WorkflowMetadata.from_workflow(workflow))
return workflow_metas
def scan_spec(self, dir_item: os.DirEntry, is_master=False, category=None):
if not is_master and not category:
raise ApiError("invalid_spec_dir", "Please specify what category this workflow belongs to.")
spec_path = os.path.join(dir_item.path, self.WF_JSON_FILE)
if os.path.exists(spec_path):
with open(spec_path) as wf_json:
data = json.load(wf_json)
spec = self.WF_SCHEMA.load(data)
else:
spec = WorkflowSpecInfo(id=dir_item.name, library=False, standalone=False, is_master_spec=is_master,
display_name=dir_item.name, description="", primary_process_id="",
2022-02-07 16:13:38 -05:00
primary_file_name="", display_order=0, is_review=False,
2022-02-07 12:18:32 -05:00
libraries=[])
with open(spec_path, "w") as wf_json:
2022-02-07 12:18:32 -05:00
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
if is_master:
self.master_spec = spec
elif category:
spec.category = category
2022-02-09 08:50:00 -05:00
category.specs.append(spec)
self.specs[spec.id] = spec