Cleanup - renaming frenzy, use os.path.abspath
This commit is contained in:
parent
897615d0fd
commit
fe582261fe
|
@ -32,7 +32,7 @@ class ProcessModelService(FileSystemService):
|
|||
the workflow process_models at once, or manage those file in a git repository. """
|
||||
|
||||
GROUP_SCHEMA = ProcessGroupSchema()
|
||||
WF_SCHEMA = ProcessModelInfoSchema()
|
||||
PROCESS_MODEL_SCHEMA = ProcessModelInfoSchema()
|
||||
|
||||
def is_group(self, path: str) -> bool:
|
||||
"""Is_group."""
|
||||
|
@ -76,12 +76,12 @@ class ProcessModelService(FileSystemService):
|
|||
|
||||
def save_process_model(self, process_model: ProcessModelInfo) -> None:
|
||||
"""Save_process_model."""
|
||||
spec_path = os.path.join(FileSystemService.root_path(), process_model.id)
|
||||
os.makedirs(spec_path, exist_ok=True)
|
||||
json_path = os.path.join(spec_path, self.PROCESS_MODEL_JSON_FILE)
|
||||
process_model_path = os.path.abspath(os.path.join(FileSystemService.root_path(), process_model.id))
|
||||
os.makedirs(process_model_path, exist_ok=True)
|
||||
json_path = os.path.abspath(os.path.join(process_model_path, self.PROCESS_MODEL_JSON_FILE))
|
||||
with open(json_path, "w") as wf_json:
|
||||
json.dump(
|
||||
self.WF_SCHEMA.dump(process_model), wf_json, indent=4, sort_keys=True
|
||||
self.PROCESS_MODEL_SCHEMA.dump(process_model), wf_json, indent=4, sort_keys=True
|
||||
)
|
||||
|
||||
def process_model_delete(self, process_model_id: str) -> None:
|
||||
|
@ -107,7 +107,7 @@ class ProcessModelService(FileSystemService):
|
|||
process_group_identifier, _ = os.path.split(relative_path)
|
||||
process_group = cls().get_process_group(process_group_identifier)
|
||||
path = os.path.join(FileSystemService.root_path(), relative_path)
|
||||
return cls().__scan_spec(path, process_group=process_group)
|
||||
return cls().__scan_process_model(path, process_group=process_group)
|
||||
|
||||
def get_process_model(self, process_model_id: str) -> ProcessModelInfo:
|
||||
"""Get a process model from a model and group id.
|
||||
|
@ -117,7 +117,9 @@ class ProcessModelService(FileSystemService):
|
|||
if not os.path.exists(FileSystemService.root_path()):
|
||||
raise ProcessEntityNotFoundError("process_model_root_not_found")
|
||||
|
||||
model_path = os.path.join(FileSystemService.root_path(), process_model_id)
|
||||
model_path = os.path.abspath(
|
||||
os.path.join(FileSystemService.root_path(), process_model_id)
|
||||
)
|
||||
if self.is_model(model_path):
|
||||
process_model = self.get_process_model_from_relative_path(process_model_id)
|
||||
return process_model
|
||||
|
@ -140,7 +142,7 @@ class ProcessModelService(FileSystemService):
|
|||
# process_group = self.__scan_process_group(
|
||||
# process_group_dir
|
||||
# )
|
||||
# return self.__scan_spec(sd.path, sd.name, process_group)
|
||||
# return self.__scan_process_model(sd.path, sd.name, process_group)
|
||||
raise ProcessEntityNotFoundError("process_model_not_found")
|
||||
|
||||
def get_process_models(
|
||||
|
@ -172,9 +174,11 @@ class ProcessModelService(FileSystemService):
|
|||
def get_process_group(self, process_group_id: str) -> ProcessGroup:
|
||||
"""Look for a given process_group, and return it."""
|
||||
if os.path.exists(FileSystemService.root_path()):
|
||||
process_group_path = os.path.join(
|
||||
process_group_path = os.path.abspath(
|
||||
os.path.join(
|
||||
FileSystemService.root_path(), process_group_id
|
||||
)
|
||||
)
|
||||
if self.is_group(process_group_path):
|
||||
return self.__scan_process_group(process_group_path)
|
||||
# nested_groups = []
|
||||
|
@ -312,7 +316,7 @@ class ProcessModelService(FileSystemService):
|
|||
)
|
||||
elif self.is_model(nested_item.path):
|
||||
process_group.process_models.append(
|
||||
self.__scan_spec(
|
||||
self.__scan_process_model(
|
||||
nested_item.path,
|
||||
nested_item.name,
|
||||
process_group=process_group,
|
||||
|
@ -322,22 +326,22 @@ class ProcessModelService(FileSystemService):
|
|||
# process_group.process_groups.sort()
|
||||
return process_group
|
||||
|
||||
def __scan_spec(
|
||||
def __scan_process_model(
|
||||
self,
|
||||
path: str,
|
||||
name: Optional[str] = None,
|
||||
process_group: Optional[ProcessGroup] = None,
|
||||
) -> ProcessModelInfo:
|
||||
"""__scan_spec."""
|
||||
spec_path = os.path.join(path, self.PROCESS_MODEL_JSON_FILE)
|
||||
"""__scan_process_model."""
|
||||
json_file_path = os.path.join(path, self.PROCESS_MODEL_JSON_FILE)
|
||||
|
||||
if os.path.exists(spec_path):
|
||||
with open(spec_path) as wf_json:
|
||||
if os.path.exists(json_file_path):
|
||||
with open(json_file_path) as wf_json:
|
||||
data = json.load(wf_json)
|
||||
if "process_group_id" in data:
|
||||
data.pop("process_group_id")
|
||||
spec = ProcessModelInfo(**data)
|
||||
if spec is None:
|
||||
process_model_info = ProcessModelInfo(**data)
|
||||
if process_model_info is None:
|
||||
raise ApiError(
|
||||
error_code="process_model_could_not_be_loaded_from_disk",
|
||||
message=f"We could not load the process_model from disk with data: {data}",
|
||||
|
@ -349,15 +353,15 @@ class ProcessModelService(FileSystemService):
|
|||
message="Missing name of process model. It should be given",
|
||||
)
|
||||
|
||||
spec = ProcessModelInfo(
|
||||
process_model_info = ProcessModelInfo(
|
||||
id=name,
|
||||
display_name=name,
|
||||
description="",
|
||||
display_order=0,
|
||||
is_review=False,
|
||||
)
|
||||
with open(spec_path, "w") as wf_json:
|
||||
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
|
||||
with open(json_file_path, "w") as wf_json:
|
||||
json.dump(self.PROCESS_MODEL_SCHEMA.dump(process_model_info), wf_json, indent=4)
|
||||
if process_group:
|
||||
spec.process_group = process_group.id
|
||||
return spec
|
||||
process_model_info.process_group = process_group.id
|
||||
return process_model_info
|
||||
|
|
|
@ -197,7 +197,9 @@ class SpecFileService(FileSystemService):
|
|||
@staticmethod
|
||||
def full_file_path(spec: ProcessModelInfo, file_name: str) -> str:
|
||||
"""File_path."""
|
||||
return os.path.join(SpecFileService.workflow_path(spec), file_name)
|
||||
return os.path.abspath(
|
||||
os.path.join(SpecFileService.workflow_path(spec), file_name)
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def last_modified(spec: ProcessModelInfo, file_name: str) -> datetime:
|
||||
|
|
|
@ -137,7 +137,9 @@ class BaseTest:
|
|||
# make sure we have a group
|
||||
process_group_id, _ = os.path.split(process_model_id)
|
||||
modified_process_group_id = process_group_id.replace("/", ":")
|
||||
process_group_path = f"{FileSystemService.root_path()}/{process_group_id}"
|
||||
process_group_path = os.path.abspath(
|
||||
os.path.join(FileSystemService.root_path(), process_group_id)
|
||||
)
|
||||
if ProcessModelService().is_group(process_group_path):
|
||||
|
||||
if exception_notification_addresses is None:
|
||||
|
|
Loading…
Reference in New Issue