oh my god why were these ever instance methods
This commit is contained in:
parent
37fcc547f7
commit
e9a09342d5
|
@ -29,14 +29,14 @@ ALLOWED_BPMN_EXTENSIONS = {"bpmn", "dmn"}
|
|||
@admin_blueprint.route("/process-groups", methods=["GET"])
|
||||
def process_group_list() -> str:
|
||||
"""Process_group_list."""
|
||||
process_groups = ProcessModelService().get_process_groups()
|
||||
process_groups = ProcessModelService.get_process_groups()
|
||||
return render_template("process_group_list.html", process_groups=process_groups)
|
||||
|
||||
|
||||
@admin_blueprint.route("/process-groups/<process_group_id>", methods=["GET"])
|
||||
def process_group_show(process_group_id: str) -> str:
|
||||
"""Show_process_group."""
|
||||
process_group = ProcessModelService().get_process_group(process_group_id)
|
||||
process_group = ProcessModelService.get_process_group(process_group_id)
|
||||
return render_template("process_group_show.html", process_group=process_group)
|
||||
|
||||
|
||||
|
|
|
@ -194,11 +194,11 @@ def process_group_list(
|
|||
) -> flask.wrappers.Response:
|
||||
"""Process_group_list."""
|
||||
if process_group_identifier is not None:
|
||||
process_groups = ProcessModelService().get_process_groups(
|
||||
process_groups = ProcessModelService.get_process_groups(
|
||||
process_group_identifier
|
||||
)
|
||||
else:
|
||||
process_groups = ProcessModelService().get_process_groups()
|
||||
process_groups = ProcessModelService.get_process_groups()
|
||||
batch = ProcessModelService().get_batch(
|
||||
items=process_groups, page=page, per_page=per_page
|
||||
)
|
||||
|
@ -224,7 +224,7 @@ def process_group_show(
|
|||
"""Process_group_show."""
|
||||
process_group_id = un_modify_modified_process_model_id(modified_process_group_id)
|
||||
try:
|
||||
process_group = ProcessModelService().get_process_group(process_group_id)
|
||||
process_group = ProcessModelService.get_process_group(process_group_id)
|
||||
except ProcessEntityNotFoundError as exception:
|
||||
raise (
|
||||
ApiError(
|
||||
|
|
|
@ -37,9 +37,10 @@ class ProcessModelService(FileSystemService):
|
|||
GROUP_SCHEMA = ProcessGroupSchema()
|
||||
PROCESS_MODEL_SCHEMA = ProcessModelInfoSchema()
|
||||
|
||||
def is_group(self, path: str) -> bool:
|
||||
@classmethod
|
||||
def is_group(cls, path: str) -> bool:
|
||||
"""Is_group."""
|
||||
group_json_path = os.path.join(path, self.PROCESS_GROUP_JSON_FILE)
|
||||
group_json_path = os.path.join(path, cls.PROCESS_GROUP_JSON_FILE)
|
||||
if os.path.exists(group_json_path):
|
||||
return True
|
||||
return False
|
||||
|
@ -143,7 +144,7 @@ class ProcessModelService(FileSystemService):
|
|||
process_group_identifier, _ = os.path.split(relative_path)
|
||||
process_group = cls().get_process_group(process_group_identifier)
|
||||
path = os.path.join(FileSystemService.root_path(), relative_path)
|
||||
return cls().__scan_process_model(path, process_group=process_group)
|
||||
return cls.__scan_process_model(path, process_group=process_group)
|
||||
|
||||
@classmethod
|
||||
def get_process_model(cls, process_model_id: str) -> ProcessModelInfo:
|
||||
|
@ -213,23 +214,25 @@ class ProcessModelService(FileSystemService):
|
|||
full_group_id_path = process_group_id_segment
|
||||
else:
|
||||
full_group_id_path = f"{full_group_id_path}/{process_group_id_segment}" # type: ignore
|
||||
parent_group = ProcessModelService().get_process_group(full_group_id_path)
|
||||
parent_group = ProcessModelService.get_process_group(full_group_id_path)
|
||||
if parent_group:
|
||||
parent_group_array.append(
|
||||
{"id": parent_group.id, "display_name": parent_group.display_name}
|
||||
)
|
||||
return parent_group_array
|
||||
|
||||
@classmethod
|
||||
def get_process_groups(
|
||||
self, process_group_id: Optional[str] = None
|
||||
cls, process_group_id: Optional[str] = None
|
||||
) -> list[ProcessGroup]:
|
||||
"""Returns the process_groups as a list in display order."""
|
||||
process_groups = self.__scan_process_groups(process_group_id)
|
||||
process_groups = cls.__scan_process_groups(process_group_id)
|
||||
process_groups.sort()
|
||||
return process_groups
|
||||
|
||||
@classmethod
|
||||
def get_process_group(
|
||||
self, process_group_id: str, find_direct_nested_items: bool = True
|
||||
cls, process_group_id: str, find_direct_nested_items: bool = True
|
||||
) -> ProcessGroup:
|
||||
"""Look for a given process_group, and return it."""
|
||||
if os.path.exists(FileSystemService.root_path()):
|
||||
|
@ -239,7 +242,7 @@ class ProcessModelService(FileSystemService):
|
|||
FileSystemService.id_string_to_relative_path(process_group_id),
|
||||
)
|
||||
)
|
||||
if self.is_group(process_group_path):
|
||||
if cls.is_group(process_group_path):
|
||||
return self.find_or_create_process_group(
|
||||
process_group_path,
|
||||
find_direct_nested_items=find_direct_nested_items,
|
||||
|
@ -323,8 +326,9 @@ class ProcessModelService(FileSystemService):
|
|||
index += 1
|
||||
return process_groups
|
||||
|
||||
@classmethod
|
||||
def __scan_process_groups(
|
||||
self, process_group_id: Optional[str] = None
|
||||
cls, process_group_id: Optional[str] = None
|
||||
) -> list[ProcessGroup]:
|
||||
"""__scan_process_groups."""
|
||||
if not os.path.exists(FileSystemService.root_path()):
|
||||
|
@ -338,16 +342,17 @@ class ProcessModelService(FileSystemService):
|
|||
process_groups = []
|
||||
for item in directory_items:
|
||||
# if item.is_dir() and not item.name[0] == ".":
|
||||
if item.is_dir() and self.is_group(item): # type: ignore
|
||||
scanned_process_group = self.find_or_create_process_group(item.path)
|
||||
if item.is_dir() and cls.is_group(item): # type: ignore
|
||||
scanned_process_group = cls.find_or_create_process_group(item.path)
|
||||
process_groups.append(scanned_process_group)
|
||||
return process_groups
|
||||
|
||||
@classmethod
|
||||
def find_or_create_process_group(
|
||||
self, dir_path: str, find_direct_nested_items: bool = True
|
||||
cls, dir_path: str, find_direct_nested_items: bool = True
|
||||
) -> ProcessGroup:
|
||||
"""Reads the process_group.json file, and any nested directories."""
|
||||
cat_path = os.path.join(dir_path, self.PROCESS_GROUP_JSON_FILE)
|
||||
cat_path = os.path.join(dir_path, cls.PROCESS_GROUP_JSON_FILE)
|
||||
if os.path.exists(cat_path):
|
||||
with open(cat_path) as cat_json:
|
||||
data = json.load(cat_json)
|
||||
|
@ -368,7 +373,7 @@ class ProcessModelService(FileSystemService):
|
|||
display_order=10000,
|
||||
admin=False,
|
||||
)
|
||||
self.write_json_file(cat_path, self.GROUP_SCHEMA.dump(process_group))
|
||||
cls.write_json_file(cat_path, cls.GROUP_SCHEMA.dump(process_group))
|
||||
# we don't store `id` in the json files, so we add it in here
|
||||
process_group.id = process_group_id
|
||||
|
||||
|
@ -379,14 +384,14 @@ class ProcessModelService(FileSystemService):
|
|||
for nested_item in nested_items:
|
||||
if nested_item.is_dir():
|
||||
# TODO: check whether this is a group or model
|
||||
if self.is_group(nested_item.path):
|
||||
if cls.is_group(nested_item.path):
|
||||
# This is a nested group
|
||||
process_group.process_groups.append(
|
||||
self.find_or_create_process_group(nested_item.path)
|
||||
cls.find_or_create_process_group(nested_item.path)
|
||||
)
|
||||
elif ProcessModelService.is_model(nested_item.path):
|
||||
process_group.process_models.append(
|
||||
self.__scan_process_model(
|
||||
cls.__scan_process_model(
|
||||
nested_item.path,
|
||||
nested_item.name,
|
||||
process_group=process_group,
|
||||
|
@ -396,14 +401,15 @@ class ProcessModelService(FileSystemService):
|
|||
# process_group.process_groups.sort()
|
||||
return process_group
|
||||
|
||||
@classmethod
|
||||
def __scan_process_model(
|
||||
self,
|
||||
cls,
|
||||
path: str,
|
||||
name: Optional[str] = None,
|
||||
process_group: Optional[ProcessGroup] = None,
|
||||
) -> ProcessModelInfo:
|
||||
"""__scan_process_model."""
|
||||
json_file_path = os.path.join(path, self.PROCESS_MODEL_JSON_FILE)
|
||||
json_file_path = os.path.join(path, cls.PROCESS_MODEL_JSON_FILE)
|
||||
|
||||
if os.path.exists(json_file_path):
|
||||
with open(json_file_path) as wf_json:
|
||||
|
@ -432,8 +438,8 @@ class ProcessModelService(FileSystemService):
|
|||
description="",
|
||||
display_order=0,
|
||||
)
|
||||
self.write_json_file(
|
||||
json_file_path, self.PROCESS_MODEL_SCHEMA.dump(process_model_info)
|
||||
cls.write_json_file(
|
||||
json_file_path, cls.PROCESS_MODEL_SCHEMA.dump(process_model_info)
|
||||
)
|
||||
# we don't store `id` in the json files, so we add it in here
|
||||
process_model_info.id = name
|
||||
|
|
|
@ -138,7 +138,7 @@ class TestProcessApi(BaseTest):
|
|||
)
|
||||
assert model_display_name == process_model.display_name
|
||||
assert 0 == process_model.display_order
|
||||
assert 1 == len(ProcessModelService().get_process_groups())
|
||||
assert 1 == len(ProcessModelService.get_process_groups())
|
||||
|
||||
# add bpmn file to the model
|
||||
bpmn_file_name = "sample.bpmn"
|
||||
|
@ -539,7 +539,7 @@ class TestProcessApi(BaseTest):
|
|||
assert result.description == "Test Description"
|
||||
|
||||
# Check what is persisted
|
||||
persisted = ProcessModelService().get_process_group("test")
|
||||
persisted = ProcessModelService.get_process_group("test")
|
||||
assert persisted.display_name == "Another Test Category"
|
||||
assert persisted.id == "test"
|
||||
assert persisted.description == "Test Description"
|
||||
|
@ -561,7 +561,7 @@ class TestProcessApi(BaseTest):
|
|||
process_group_id,
|
||||
display_name=process_group_display_name,
|
||||
)
|
||||
persisted = ProcessModelService().get_process_group(process_group_id)
|
||||
persisted = ProcessModelService.get_process_group(process_group_id)
|
||||
assert persisted is not None
|
||||
assert persisted.id == process_group_id
|
||||
|
||||
|
@ -571,7 +571,7 @@ class TestProcessApi(BaseTest):
|
|||
)
|
||||
|
||||
with pytest.raises(ProcessEntityNotFoundError):
|
||||
ProcessModelService().get_process_group(process_group_id)
|
||||
ProcessModelService.get_process_group(process_group_id)
|
||||
|
||||
def test_process_group_update(
|
||||
self,
|
||||
|
@ -587,7 +587,7 @@ class TestProcessApi(BaseTest):
|
|||
self.create_process_group(
|
||||
client, with_super_admin_user, group_id, display_name=group_display_name
|
||||
)
|
||||
process_group = ProcessModelService().get_process_group(group_id)
|
||||
process_group = ProcessModelService.get_process_group(group_id)
|
||||
|
||||
assert process_group.display_name == group_display_name
|
||||
|
||||
|
@ -601,7 +601,7 @@ class TestProcessApi(BaseTest):
|
|||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
process_group = ProcessModelService().get_process_group(group_id)
|
||||
process_group = ProcessModelService.get_process_group(group_id)
|
||||
assert process_group.display_name == "Modified Display Name"
|
||||
|
||||
def test_process_group_list(
|
||||
|
@ -2445,7 +2445,7 @@ class TestProcessApi(BaseTest):
|
|||
)
|
||||
# make sure initial groups exist
|
||||
for group in groups:
|
||||
persisted = ProcessModelService().get_process_group(group)
|
||||
persisted = ProcessModelService.get_process_group(group)
|
||||
assert persisted is not None
|
||||
assert persisted.id == group
|
||||
|
||||
|
@ -2519,7 +2519,7 @@ class TestProcessApi(BaseTest):
|
|||
client, with_super_admin_user, original_sub_path, display_name=sub_group_id
|
||||
)
|
||||
# make sure original subgroup exists
|
||||
persisted = ProcessModelService().get_process_group(original_sub_path)
|
||||
persisted = ProcessModelService.get_process_group(original_sub_path)
|
||||
assert persisted is not None
|
||||
assert persisted.id == original_sub_path
|
||||
|
||||
|
@ -2536,11 +2536,11 @@ class TestProcessApi(BaseTest):
|
|||
|
||||
# make sure the original subgroup does not exist
|
||||
with pytest.raises(ProcessEntityNotFoundError) as e:
|
||||
ProcessModelService().get_process_group(original_sub_path)
|
||||
ProcessModelService.get_process_group(original_sub_path)
|
||||
|
||||
assert e.value.args[0] == "process_group_not_found"
|
||||
assert e.value.args[1] == f"Process Group Id: {original_sub_path}"
|
||||
|
||||
# make sure the new subgroup does exist
|
||||
new_process_group = ProcessModelService().get_process_group(new_sub_path)
|
||||
new_process_group = ProcessModelService.get_process_group(new_sub_path)
|
||||
assert new_process_group.id == new_sub_path
|
||||
|
|
|
@ -12,5 +12,5 @@ def test_there_is_at_least_one_group_after_we_create_one(
|
|||
process_model_service = ProcessModelService()
|
||||
process_group = ProcessGroup(id="hey", display_name="sure")
|
||||
process_model_service.add_process_group(process_group)
|
||||
process_groups = ProcessModelService().get_process_groups()
|
||||
process_groups = ProcessModelService.get_process_groups()
|
||||
assert len(process_groups) > 0
|
||||
|
|
Loading…
Reference in New Issue