no reason to instantiate a ProcessModelService

This commit is contained in:
jasquat 2023-05-17 10:16:09 -04:00
parent 3d35dc6213
commit 1cd2a794eb
7 changed files with 37 additions and 45 deletions

View File

@ -55,9 +55,8 @@ def with_db_and_bpmn_file_cleanup() -> None:
try:
yield
finally:
process_model_service = ProcessModelService()
if os.path.exists(process_model_service.root_path()):
shutil.rmtree(process_model_service.root_path())
if os.path.exists(ProcessModelService.root_path()):
shutil.rmtree(ProcessModelService.root_path())
@pytest.fixture()

View File

@ -53,7 +53,7 @@ def process_group_delete(modified_process_group_id: str) -> flask.wrappers.Respo
process_group_id = _un_modify_modified_process_model_id(modified_process_group_id)
try:
ProcessModelService().process_group_delete(process_group_id)
ProcessModelService.process_group_delete(process_group_id)
except ProcessModelWithInstancesNotDeletableError as exception:
raise ApiError(
error_code="existing_instances",
@ -88,7 +88,7 @@ def process_group_list(
process_group_identifier: Optional[str] = None, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
process_groups = ProcessModelService.get_process_groups_for_api(process_group_identifier)
batch = ProcessModelService().get_batch(items=process_groups, page=page, per_page=per_page)
batch = ProcessModelService.get_batch(items=process_groups, page=page, per_page=per_page)
pages = len(process_groups) // per_page
remainder = len(process_groups) % per_page
if remainder > 0:
@ -128,7 +128,7 @@ def process_group_show(
def process_group_move(modified_process_group_identifier: str, new_location: str) -> flask.wrappers.Response:
"""Process_group_move."""
original_process_group_id = _un_modify_modified_process_model_id(modified_process_group_identifier)
new_process_group = ProcessModelService().process_group_move(original_process_group_id, new_location)
new_process_group = ProcessModelService.process_group_move(original_process_group_id, new_location)
_commit_and_push_to_git(
f"User: {g.user.username} moved process group {original_process_group_id} to {new_process_group.id}"
)

View File

@ -104,7 +104,7 @@ def process_model_delete(
"""Process_model_delete."""
process_model_identifier = modified_process_model_identifier.replace(":", "/")
try:
ProcessModelService().process_model_delete(process_model_identifier)
ProcessModelService.process_model_delete(process_model_identifier)
except ProcessModelWithInstancesNotDeletableError as exception:
raise ApiError(
error_code="existing_instances",
@ -182,7 +182,7 @@ def process_model_show(modified_process_model_identifier: str, include_file_refe
def process_model_move(modified_process_model_identifier: str, new_location: str) -> flask.wrappers.Response:
"""Process_model_move."""
original_process_model_id = _un_modify_modified_process_model_id(modified_process_model_identifier)
new_process_model = ProcessModelService().process_model_move(original_process_model_id, new_location)
new_process_model = ProcessModelService.process_model_move(original_process_model_id, new_location)
_commit_and_push_to_git(
f"User: {g.user.username} moved process model {original_process_model_id} to {new_process_model.id}"
)
@ -219,7 +219,7 @@ def process_model_list(
recursive=recursive,
filter_runnable_by_user=filter_runnable_by_user,
)
process_models_to_return = ProcessModelService().get_batch(process_models, page=page, per_page=per_page)
process_models_to_return = ProcessModelService.get_batch(process_models, page=page, per_page=per_page)
if include_parent_groups:
process_group_cache = IdToProcessGroupMapping({})

View File

@ -420,7 +420,6 @@ class ProcessInstanceProcessor:
)
self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService()
bpmn_process_spec = None
self.full_bpmn_process_dict: dict = {}
@ -1018,7 +1017,7 @@ class ProcessInstanceProcessor:
ready_or_waiting_tasks = self.get_all_ready_or_waiting_tasks()
process_model_display_name = ""
process_model_info = self.process_model_service.get_process_model(
process_model_info = ProcessModelService.get_process_model(
self.process_instance_model.process_model_identifier
)
if process_model_info is not None:

View File

@ -192,8 +192,7 @@ class ProcessInstanceService:
"""
# navigation = processor.bpmn_process_instance.get_deep_nav_list()
# ProcessInstanceService.update_navigation(navigation, processor)
process_model_service = ProcessModelService()
process_model_service.get_process_model(processor.process_model_identifier)
ProcessModelService.get_process_model(processor.process_model_identifier)
process_instance_api = ProcessInstanceApi(
id=processor.get_process_instance_id(),
status=processor.get_status(),

View File

@ -94,7 +94,6 @@ class ProcessModelService(FileSystemService):
page: int = 1,
per_page: int = 10,
) -> list[T]:
"""Get_batch."""
start = (page - 1) * per_page
end = start + per_page
return items[start:end]
@ -129,8 +128,8 @@ class ProcessModelService(FileSystemService):
cls.write_json_file(json_path, json_data)
process_model.id = process_model_id
def process_model_delete(self, process_model_id: str) -> None:
"""Delete Procecss Model."""
@classmethod
def process_model_delete(cls, process_model_id: str) -> None:
instances = ProcessInstanceModel.query.filter(
ProcessInstanceModel.process_model_identifier == process_model_id
).all()
@ -138,19 +137,19 @@ class ProcessModelService(FileSystemService):
raise ProcessModelWithInstancesNotDeletableError(
f"We cannot delete the model `{process_model_id}`, there are existing instances that depend on it."
)
process_model = self.get_process_model(process_model_id)
path = self.process_model_full_path(process_model)
process_model = cls.get_process_model(process_model_id)
path = cls.process_model_full_path(process_model)
shutil.rmtree(path)
def process_model_move(self, original_process_model_id: str, new_location: str) -> ProcessModelInfo:
"""Process_model_move."""
process_model = self.get_process_model(original_process_model_id)
original_model_path = self.process_model_full_path(process_model)
@classmethod
def process_model_move(cls, original_process_model_id: str, new_location: str) -> ProcessModelInfo:
process_model = cls.get_process_model(original_process_model_id)
original_model_path = cls.process_model_full_path(process_model)
_, model_id = os.path.split(original_model_path)
new_relative_path = os.path.join(new_location, model_id)
new_model_path = os.path.abspath(os.path.join(FileSystemService.root_path(), new_relative_path))
shutil.move(original_model_path, new_model_path)
new_process_model = self.get_process_model(new_relative_path)
new_process_model = cls.get_process_model(new_relative_path)
return new_process_model
@classmethod
@ -315,12 +314,10 @@ class ProcessModelService(FileSystemService):
@classmethod
def add_process_group(cls, process_group: ProcessGroup) -> ProcessGroup:
"""Add_process_group."""
return cls.update_process_group(process_group)
@classmethod
def update_process_group(cls, process_group: ProcessGroup) -> ProcessGroup:
"""Update_process_group."""
cat_path = cls.full_path_from_id(process_group.id)
os.makedirs(cat_path, exist_ok=True)
json_path = os.path.join(cat_path, cls.PROCESS_GROUP_JSON_FILE)
@ -331,33 +328,33 @@ class ProcessModelService(FileSystemService):
cls.write_json_file(json_path, serialized_process_group)
return process_group
def process_group_move(self, original_process_group_id: str, new_location: str) -> ProcessGroup:
"""Process_group_move."""
original_group_path = self.full_path_from_id(original_process_group_id)
@classmethod
def process_group_move(cls, original_process_group_id: str, new_location: str) -> ProcessGroup:
original_group_path = cls.full_path_from_id(original_process_group_id)
_, original_group_id = os.path.split(original_group_path)
new_root = os.path.join(FileSystemService.root_path(), new_location)
new_group_path = os.path.abspath(os.path.join(FileSystemService.root_path(), new_root, original_group_id))
destination = shutil.move(original_group_path, new_group_path)
new_process_group = self.get_process_group(destination)
new_process_group = cls.get_process_group(destination)
return new_process_group
def __get_all_nested_models(self, group_path: str) -> list:
"""__get_all_nested_models."""
@classmethod
def __get_all_nested_models(cls, group_path: str) -> list:
all_nested_models = []
for _root, dirs, _files in os.walk(group_path):
for dir in dirs:
model_dir = os.path.join(group_path, dir)
if ProcessModelService.is_process_model(model_dir):
process_model = self.get_process_model(model_dir)
process_model = cls.get_process_model(model_dir)
all_nested_models.append(process_model)
return all_nested_models
def process_group_delete(self, process_group_id: str) -> None:
"""Delete_process_group."""
@classmethod
def process_group_delete(cls, process_group_id: str) -> None:
problem_models = []
path = self.full_path_from_id(process_group_id)
path = cls.full_path_from_id(process_group_id)
if os.path.exists(path):
nested_models = self.__get_all_nested_models(path)
nested_models = cls.__get_all_nested_models(path)
for process_model in nested_models:
instances = ProcessInstanceModel.query.filter(
ProcessInstanceModel.process_model_identifier == process_model.id
@ -371,15 +368,15 @@ class ProcessModelService(FileSystemService):
f" {problem_models}"
)
shutil.rmtree(path)
self.cleanup_process_group_display_order()
cls._cleanup_process_group_display_order()
def cleanup_process_group_display_order(self) -> List[Any]:
"""Cleanup_process_group_display_order."""
process_groups = self.get_process_groups() # Returns an ordered list
@classmethod
def _cleanup_process_group_display_order(cls) -> List[Any]:
process_groups = cls.get_process_groups() # Returns an ordered list
index = 0
for process_group in process_groups:
process_group.display_order = index
self.update_process_group(process_group)
cls.update_process_group(process_group)
index += 1
return process_groups

View File

@ -12,12 +12,10 @@ from spiffworkflow_backend.services.process_model_service import ProcessModelSer
def assure_process_group_exists(process_group_id: Optional[str] = None) -> ProcessGroup:
"""Assure_process_group_exists."""
process_group = None
process_model_service = ProcessModelService()
if process_group_id is not None:
try:
process_group = process_model_service.get_process_group(process_group_id)
process_group = ProcessModelService.get_process_group(process_group_id)
except ProcessEntityNotFoundError:
process_group = None
@ -31,7 +29,7 @@ def assure_process_group_exists(process_group_id: Optional[str] = None) -> Proce
admin=False,
display_order=0,
)
process_model_service.add_process_group(process_group)
ProcessModelService.add_process_group(process_group)
return process_group