Committing so Jon can view code
This commit is contained in:
parent
9d43fd1aab
commit
f8878e0f77
|
@ -197,7 +197,7 @@ paths:
|
|||
type: string
|
||||
# process_group_show
|
||||
get:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_group_show
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_group_show_2
|
||||
summary: Returns a single process group
|
||||
tags:
|
||||
- Process Groups
|
||||
|
@ -271,7 +271,7 @@ paths:
|
|||
$ref: "#/components/schemas/ProcessModel"
|
||||
# process_model_add
|
||||
post:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_add
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_add_2
|
||||
summary: Creates a new process model with the given parameters.
|
||||
tags:
|
||||
- Process Models
|
||||
|
@ -1620,6 +1620,8 @@ components:
|
|||
type: string
|
||||
name:
|
||||
type: string
|
||||
parent:
|
||||
type: string
|
||||
display_name:
|
||||
type: string
|
||||
display_order:
|
||||
|
|
|
@ -20,6 +20,7 @@ class ProcessGroup:
|
|||
|
||||
id: str # A unique string name, lower case, under scores (ie, 'my_group')
|
||||
display_name: str
|
||||
parent: str = ''
|
||||
display_order: int | None = 0
|
||||
admin: bool | None = False
|
||||
process_models: list[ProcessModelInfo] = field(
|
||||
|
@ -46,7 +47,7 @@ class ProcessGroupSchema(Schema):
|
|||
"""Meta."""
|
||||
|
||||
model = ProcessGroup
|
||||
fields = ["id", "display_name", "display_order", "admin", "process_models"]
|
||||
fields = ["id", "display_name", "display_order", "admin", "process_models", "parent"]
|
||||
|
||||
process_models = marshmallow.fields.List(
|
||||
marshmallow.fields.Nested(
|
||||
|
|
|
@ -184,6 +184,11 @@ def process_groups_list(page: int = 1, per_page: int = 100) -> flask.wrappers.Re
|
|||
return Response(json.dumps(response_json), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def process_group_show_2(process_group_id: str) -> Any:
|
||||
print(f"process_group_show_2: {process_group_id}")
|
||||
return process_group_show(process_group_id)
|
||||
|
||||
|
||||
def process_group_show(
|
||||
process_group_id: str,
|
||||
) -> Any:
|
||||
|
@ -201,6 +206,13 @@ def process_group_show(
|
|||
return ProcessGroupSchema().dump(process_group)
|
||||
|
||||
|
||||
def process_model_add_2(
|
||||
body: Dict[str, Union[str, bool, int]]
|
||||
):
|
||||
print("process_model_add_2")
|
||||
return process_model_add(body)
|
||||
|
||||
|
||||
def process_model_add(
|
||||
body: Dict[str, Union[str, bool, int]]
|
||||
) -> flask.wrappers.Response:
|
||||
|
|
|
@ -32,9 +32,9 @@ class FileSystemService:
|
|||
return os.path.join(app_root, "..", dir_name)
|
||||
|
||||
@staticmethod
|
||||
def process_group_path(name: str) -> str:
|
||||
def process_group_path(name: str, parent: str) -> str:
|
||||
"""Category_path."""
|
||||
return os.path.join(FileSystemService.root_path(), name)
|
||||
return os.path.join(FileSystemService.root_path(), parent, name)
|
||||
|
||||
@staticmethod
|
||||
def full_path_from_relative_path(relative_path: str) -> str:
|
||||
|
|
|
@ -34,6 +34,18 @@ class ProcessModelService(FileSystemService):
|
|||
GROUP_SCHEMA = ProcessGroupSchema()
|
||||
WF_SCHEMA = ProcessModelInfoSchema()
|
||||
|
||||
def is_group(self, path):
|
||||
group_json_path = os.path.join(path, self.CAT_JSON_FILE)
|
||||
if os.path.exists(group_json_path):
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_model(self, path):
|
||||
model_json_path = os.path.join(path, self.WF_JSON_FILE)
|
||||
if os.path.exists(model_json_path):
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def get_batch(
|
||||
items: list[T],
|
||||
|
@ -148,12 +160,24 @@ class ProcessModelService(FileSystemService):
|
|||
def get_process_group(self, process_group_id: str) -> ProcessGroup:
|
||||
"""Look for a given process_group, and return it."""
|
||||
if os.path.exists(FileSystemService.root_path()):
|
||||
with os.scandir(FileSystemService.root_path()) as directory_items:
|
||||
for item in directory_items:
|
||||
if item.is_dir() and item.name == process_group_id:
|
||||
return self.__scan_process_group(item)
|
||||
process_group_path = os.path.join(FileSystemService.root_path(), process_group_id)
|
||||
if self.is_group(process_group_path):
|
||||
nested_groups = []
|
||||
process_group_dir = os.scandir(process_group_path)
|
||||
for item in process_group_dir:
|
||||
if self.is_group(item.path):
|
||||
nested_group = self.get_process_group(os.path.join(process_group_path, item.path))
|
||||
nested_groups.append(nested_group)
|
||||
elif self.is_model(item.path):
|
||||
print("get_process_group: ")
|
||||
return self.__scan_process_group(process_group_path)
|
||||
# with os.scandir(FileSystemService.root_path()) as directory_items:
|
||||
# for item in directory_items:
|
||||
# if item.is_dir() and item.name == process_group_id:
|
||||
# return self.__scan_process_group(item)
|
||||
|
||||
raise ProcessEntityNotFoundError(
|
||||
else:
|
||||
raise ProcessEntityNotFoundError(
|
||||
"process_group_not_found", f"Process Group Id: {process_group_id}"
|
||||
)
|
||||
|
||||
|
@ -165,7 +189,7 @@ class ProcessModelService(FileSystemService):
|
|||
|
||||
def update_process_group(self, process_group: ProcessGroup) -> ProcessGroup:
|
||||
"""Update_process_group."""
|
||||
cat_path = self.process_group_path(process_group.id)
|
||||
cat_path = self.process_group_path(process_group.id, process_group.parent)
|
||||
os.makedirs(cat_path, exist_ok=True)
|
||||
json_path = os.path.join(cat_path, self.CAT_JSON_FILE)
|
||||
with open(json_path, "w") as cat_json:
|
||||
|
@ -202,12 +226,14 @@ class ProcessModelService(FileSystemService):
|
|||
with os.scandir(FileSystemService.root_path()) as directory_items:
|
||||
process_groups = []
|
||||
for item in directory_items:
|
||||
if item.is_dir() and not item.name[0] == ".":
|
||||
process_groups.append(self.__scan_process_group(item))
|
||||
# if item.is_dir() and not item.name[0] == ".":
|
||||
if item.is_dir() and self.is_group(item):
|
||||
scanned_process_group = self.__scan_process_group(item)
|
||||
process_groups.append(scanned_process_group)
|
||||
return process_groups
|
||||
|
||||
def __scan_process_group(self, dir_item: os.DirEntry) -> ProcessGroup:
|
||||
"""Reads the process_group.json file, and any workflow directories."""
|
||||
"""Reads the process_group.json file, and any nested directories."""
|
||||
cat_path = os.path.join(dir_item.path, self.CAT_JSON_FILE)
|
||||
if os.path.exists(cat_path):
|
||||
with open(cat_path) as cat_json:
|
||||
|
@ -227,15 +253,20 @@ class ProcessModelService(FileSystemService):
|
|||
)
|
||||
with open(cat_path, "w") as wf_json:
|
||||
json.dump(self.GROUP_SCHEMA.dump(process_group), wf_json, indent=4)
|
||||
with os.scandir(dir_item.path) as workflow_dirs:
|
||||
with os.scandir(dir_item.path) as nested_items:
|
||||
process_group.process_models = []
|
||||
for item in workflow_dirs:
|
||||
if item.is_dir():
|
||||
process_group.process_models.append(
|
||||
self.__scan_spec(
|
||||
item.path, item.name, process_group=process_group
|
||||
for nested_item in nested_items:
|
||||
if nested_item.is_dir():
|
||||
# TODO: check whether this is a group or model
|
||||
if self.is_group(nested_item.path):
|
||||
# This is a nested group
|
||||
...
|
||||
elif self.is_model(nested_item.path):
|
||||
process_group.process_models.append(
|
||||
self.__scan_spec(
|
||||
nested_item.path, nested_item.name, process_group=process_group
|
||||
)
|
||||
)
|
||||
)
|
||||
process_group.process_models.sort()
|
||||
return process_group
|
||||
|
||||
|
|
|
@ -0,0 +1,134 @@
|
|||
import json
|
||||
|
||||
from spiffworkflow_backend.models.process_group import ProcessGroup, ProcessGroupSchema
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo, ProcessModelInfoSchema
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from flask.app import Flask
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
|
||||
class TestNestedGroups(BaseTest):
|
||||
|
||||
def test_nested_groups(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
) -> None:
|
||||
# /process-groups/{process_group_path}/show
|
||||
target_uri = "/v1.0/process-groups/group_a,group_b/show"
|
||||
user = self.find_or_create_user()
|
||||
self.add_permissions_to_user(
|
||||
user, target_uri=target_uri, permission_names=["read"]
|
||||
)
|
||||
response = client.get(
|
||||
target_uri,
|
||||
headers=self.logged_in_headers(user)
|
||||
)
|
||||
print("test_nested_groups")
|
||||
|
||||
def test_add_nested_group(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
target_uri = "/process-groups"
|
||||
# user = self.find_or_create_user()
|
||||
# self.add_permissions_to_user(
|
||||
# user, target_uri=target_uri, permission_names=["read", "create"]
|
||||
# )
|
||||
process_group_a = ProcessGroup(
|
||||
id="group_a",
|
||||
display_name="Group A",
|
||||
display_order=0,
|
||||
admin=False,
|
||||
)
|
||||
response_a = client.post(
|
||||
"/v1.0/process-groups",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(ProcessGroupSchema().dump(process_group_a)),
|
||||
)
|
||||
|
||||
process_group_b = ProcessGroup(
|
||||
id="group_b",
|
||||
display_name="Group B",
|
||||
display_order=0,
|
||||
admin=False,
|
||||
parent='group_a'
|
||||
)
|
||||
response_b = client.post(
|
||||
"/v1.0/process-groups",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(ProcessGroupSchema().dump(process_group_b)),
|
||||
)
|
||||
|
||||
process_group_c = ProcessGroup(
|
||||
id="group_c",
|
||||
display_name="Group C",
|
||||
display_order=0,
|
||||
admin=False,
|
||||
parent='group_a/group_b'
|
||||
)
|
||||
response_c = client.post(
|
||||
"/v1.0/process-groups",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(ProcessGroupSchema().dump(process_group_c)),
|
||||
)
|
||||
|
||||
print("test_add_nested_group")
|
||||
|
||||
def test_process_model_add(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
):
|
||||
process_group_a = ProcessGroup(
|
||||
id="group_a",
|
||||
display_name="Group A",
|
||||
display_order=0,
|
||||
admin=False,
|
||||
)
|
||||
response_a = client.post(
|
||||
"/v1.0/process-groups",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(ProcessGroupSchema().dump(process_group_a)),
|
||||
)
|
||||
|
||||
process_group_b = ProcessGroup(
|
||||
id="group_b",
|
||||
display_name="Group B",
|
||||
display_order=0,
|
||||
admin=False,
|
||||
parent='group_a'
|
||||
)
|
||||
response_b = client.post(
|
||||
"/v1.0/process-groups",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(ProcessGroupSchema().dump(process_group_b)),
|
||||
)
|
||||
process_model = ProcessModelInfo(
|
||||
id="process_model",
|
||||
display_name="Process Model",
|
||||
description="Process Model",
|
||||
process_group_id="group_a/group_b",
|
||||
primary_file_name="primary_file.bpmn",
|
||||
primary_process_id="primary_process_id",
|
||||
display_order=0
|
||||
)
|
||||
model_response = client.post(
|
||||
"v1.0/process-models",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(ProcessModelInfoSchema().dump(process_model))
|
||||
)
|
||||
print("test_process_model_add")
|
Loading…
Reference in New Issue