Cleanup - renaming frenzy, use os.path.abspath

This commit is contained in:
mike cullerton 2022-11-17 16:35:28 -05:00
parent f88f576dcb
commit 0ccea0e8ac
3 changed files with 33 additions and 25 deletions

View File

@ -32,7 +32,7 @@ class ProcessModelService(FileSystemService):
the workflow process_models at once, or manage those file in a git repository. """
GROUP_SCHEMA = ProcessGroupSchema()
WF_SCHEMA = ProcessModelInfoSchema()
PROCESS_MODEL_SCHEMA = ProcessModelInfoSchema()
def is_group(self, path: str) -> bool:
"""Is_group."""
@ -76,12 +76,12 @@ class ProcessModelService(FileSystemService):
def save_process_model(self, process_model: ProcessModelInfo) -> None:
"""Save_process_model."""
spec_path = os.path.join(FileSystemService.root_path(), process_model.id)
os.makedirs(spec_path, exist_ok=True)
json_path = os.path.join(spec_path, self.PROCESS_MODEL_JSON_FILE)
process_model_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_model.id))
os.makedirs(process_model_path, exist_ok=True)
json_path = os.path.abspath(os.path.join(process_model_path, self.PROCESS_MODEL_JSON_FILE))
with open(json_path, "w") as wf_json:
json.dump(
self.WF_SCHEMA.dump(process_model), wf_json, indent=4, sort_keys=True
self.PROCESS_MODEL_SCHEMA.dump(process_model), wf_json, indent=4, sort_keys=True
)
def process_model_delete(self, process_model_id: str) -> None:
@ -107,7 +107,7 @@ class ProcessModelService(FileSystemService):
process_group_identifier, _ = os.path.split(relative_path)
process_group = cls().get_process_group(process_group_identifier)
path = os.path.join(FileSystemService.root_path(), relative_path)
return cls().__scan_spec(path, process_group=process_group)
return cls().__scan_process_model(path, process_group=process_group)
def get_process_model(self, process_model_id: str) -> ProcessModelInfo:
"""Get a process model from a model and group id.
@ -117,7 +117,9 @@ class ProcessModelService(FileSystemService):
if not os.path.exists(FileSystemService.root_path()):
raise ProcessEntityNotFoundError("process_model_root_not_found")
model_path = os.path.join(FileSystemService.root_path(), process_model_id)
model_path = os.path.abspath(
os.path.join(FileSystemService.root_path(), process_model_id)
)
if self.is_model(model_path):
process_model = self.get_process_model_from_relative_path(process_model_id)
return process_model
@ -140,7 +142,7 @@ class ProcessModelService(FileSystemService):
# process_group = self.__scan_process_group(
# process_group_dir
# )
# return self.__scan_spec(sd.path, sd.name, process_group)
# return self.__scan_process_model(sd.path, sd.name, process_group)
raise ProcessEntityNotFoundError("process_model_not_found")
def get_process_models(
@ -172,8 +174,10 @@ class ProcessModelService(FileSystemService):
def get_process_group(self, process_group_id: str) -> ProcessGroup:
"""Look for a given process_group, and return it."""
if os.path.exists(FileSystemService.root_path()):
process_group_path = os.path.join(
FileSystemService.root_path(), process_group_id
process_group_path = os.path.abspath(
os.path.join(
FileSystemService.root_path(), process_group_id
)
)
if self.is_group(process_group_path):
return self.__scan_process_group(process_group_path)
@ -312,7 +316,7 @@ class ProcessModelService(FileSystemService):
)
elif self.is_model(nested_item.path):
process_group.process_models.append(
self.__scan_spec(
self.__scan_process_model(
nested_item.path,
nested_item.name,
process_group=process_group,
@ -322,22 +326,22 @@ class ProcessModelService(FileSystemService):
# process_group.process_groups.sort()
return process_group
def __scan_spec(
def __scan_process_model(
self,
path: str,
name: Optional[str] = None,
process_group: Optional[ProcessGroup] = None,
) -> ProcessModelInfo:
"""__scan_spec."""
spec_path = os.path.join(path, self.PROCESS_MODEL_JSON_FILE)
"""__scan_process_model."""
json_file_path = os.path.join(path, self.PROCESS_MODEL_JSON_FILE)
if os.path.exists(spec_path):
with open(spec_path) as wf_json:
if os.path.exists(json_file_path):
with open(json_file_path) as wf_json:
data = json.load(wf_json)
if "process_group_id" in data:
data.pop("process_group_id")
spec = ProcessModelInfo(**data)
if spec is None:
process_model_info = ProcessModelInfo(**data)
if process_model_info is None:
raise ApiError(
error_code="process_model_could_not_be_loaded_from_disk",
message=f"We could not load the process_model from disk with data: {data}",
@ -349,15 +353,15 @@ class ProcessModelService(FileSystemService):
message="Missing name of process model. It should be given",
)
spec = ProcessModelInfo(
process_model_info = ProcessModelInfo(
id=name,
display_name=name,
description="",
display_order=0,
is_review=False,
)
with open(spec_path, "w") as wf_json:
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
with open(json_file_path, "w") as wf_json:
json.dump(self.PROCESS_MODEL_SCHEMA.dump(process_model_info), wf_json, indent=4)
if process_group:
spec.process_group = process_group.id
return spec
process_model_info.process_group = process_group.id
return process_model_info

View File

@ -197,7 +197,9 @@ class SpecFileService(FileSystemService):
@staticmethod
def full_file_path(spec: ProcessModelInfo, file_name: str) -> str:
"""File_path."""
return os.path.join(SpecFileService.workflow_path(spec), file_name)
return os.path.abspath(
os.path.join(SpecFileService.workflow_path(spec), file_name)
)
@staticmethod
def last_modified(spec: ProcessModelInfo, file_name: str) -> datetime:

View File

@ -137,7 +137,9 @@ class BaseTest:
# make sure we have a group
process_group_id, _ = os.path.split(process_model_id)
modified_process_group_id = process_group_id.replace("/", ":")
process_group_path = f"{FileSystemService.root_path()}/{process_group_id}"
process_group_path = os.path.abspath(
os.path.join(FileSystemService.root_path(), process_group_id)
)
if ProcessModelService().is_group(process_group_path):
if exception_notification_addresses is None: