gotta fix usage of is_model

This commit is contained in:
burnettk 2022-11-24 16:28:10 -05:00
parent 444b55a503
commit 5b73c4ddb0
10 changed files with 31 additions and 27 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
pyrightconfig.json
.idea/
t

View File

@ -81,8 +81,7 @@ def process_model_show_file(process_model_id: str, file_name: str) -> str:
)
def process_model_upload_file(process_model_id: str) -> Response:
"""Process_model_upload_file."""
process_model_service = ProcessModelService()
process_model = process_model_service.get_process_model(process_model_id)
process_model = ProcessModelService.get_process_model(process_model_id)
if "file" not in request.files:
flash("No file part", "error")
@ -97,7 +96,7 @@ def process_model_upload_file(process_model_id: str) -> Response:
SpecFileService.add_file(
process_model, request_file.filename, request_file.stream.read()
)
process_model_service.save_process_model(process_model)
ProcessModelService.save_process_model(process_model)
return redirect(
url_for("admin.process_model_show", process_model_id=process_model.id)

View File

@ -159,9 +159,8 @@ def un_modify_modified_process_model_id(modified_process_model_id: str) -> str:
def process_group_add(body: dict) -> flask.wrappers.Response:
"""Add_process_group."""
process_model_service = ProcessModelService()
process_group = ProcessGroup(**body)
process_model_service.add_process_group(process_group)
ProcessModelService.add_process_group(process_group)
return make_response(jsonify(process_group), 201)
@ -185,7 +184,7 @@ def process_group_update(
process_group_id = un_modify_modified_process_model_id(modified_process_group_id)
process_group = ProcessGroup(id=process_group_id, **body_filtered)
ProcessModelService().update_process_group(process_group)
ProcessModelService.update_process_group(process_group)
return make_response(jsonify(process_group), 200)
@ -274,8 +273,7 @@ def process_model_create(
unmodified_process_group_id = un_modify_modified_process_model_id(
modified_process_group_id
)
process_model_service = ProcessModelService()
process_group = process_model_service.get_process_group(unmodified_process_group_id)
process_group = ProcessModelService.get_process_group(unmodified_process_group_id)
if process_group is None:
raise ApiError(
error_code="process_model_could_not_be_created",
@ -283,7 +281,7 @@ def process_model_create(
status_code=400,
)
process_model_service.add_process_model(process_model_info)
ProcessModelService.add_process_model(process_model_info)
return Response(
json.dumps(ProcessModelInfoSchema().dump(process_model_info)),
status=201,
@ -320,7 +318,7 @@ def process_model_update(
# process_model_identifier = f"{process_group_id}/{process_model_id}"
process_model = get_process_model(process_model_identifier)
ProcessModelService().update_process_model(process_model, body_filtered)
ProcessModelService.update_process_model(process_model, body_filtered)
return ProcessModelInfoSchema().dump(process_model)

View File

@ -72,35 +72,38 @@ class ProcessModelService(FileSystemService):
end = start + per_page
return items[start:end]
def add_process_model(self, process_model: ProcessModelInfo) -> None:
@classmethod
def add_process_model(cls, process_model: ProcessModelInfo) -> None:
"""Add_spec."""
self.save_process_model(process_model)
cls.save_process_model(process_model)
@classmethod
def update_process_model(
self, process_model: ProcessModelInfo, attributes_to_update: dict
cls, process_model: ProcessModelInfo, attributes_to_update: dict
) -> None:
"""Update_spec."""
for atu_key, atu_value in attributes_to_update.items():
if hasattr(process_model, atu_key):
setattr(process_model, atu_key, atu_value)
self.save_process_model(process_model)
cls.save_process_model(process_model)
def save_process_model(self, process_model: ProcessModelInfo) -> None:
@classmethod
def save_process_model(cls, process_model: ProcessModelInfo) -> None:
"""Save_process_model."""
process_model_path = os.path.abspath(
os.path.join(FileSystemService.root_path(), process_model.id)
)
os.makedirs(process_model_path, exist_ok=True)
json_path = os.path.abspath(
os.path.join(process_model_path, self.PROCESS_MODEL_JSON_FILE)
os.path.join(process_model_path, cls.PROCESS_MODEL_JSON_FILE)
)
process_model_id = process_model.id
# we don't save id in the json file
# this allows us to move models around on the filesystem
# the id is determined by its location on the filesystem
delattr(process_model, "id")
json_data = self.PROCESS_MODEL_SCHEMA.dump(process_model)
self.write_json_file(json_path, json_data)
json_data = cls.PROCESS_MODEL_SCHEMA.dump(process_model)
cls.write_json_file(json_path, json_data)
process_model.id = process_model_id
def process_model_delete(self, process_model_id: str) -> None:

View File

@ -171,7 +171,7 @@ class SpecFileService(FileSystemService):
ref.is_primary = True
if ref.is_primary:
ProcessModelService().update_process_model(
ProcessModelService.update_process_model(
process_model_info,
{
"primary_process_id": ref.identifier,

View File

@ -140,7 +140,7 @@ class BaseTest:
process_group_path = os.path.abspath(
os.path.join(FileSystemService.root_path(), process_group_id)
)
if ProcessModelService().is_group(process_group_path):
if ProcessModelService.is_group(process_group_path):
if exception_notification_addresses is None:
exception_notification_addresses = []

View File

@ -37,8 +37,7 @@ class ExampleDataLoader:
description=description,
display_order=display_order,
)
workflow_spec_service = ProcessModelService()
workflow_spec_service.add_process_model(spec)
ProcessModelService.add_process_model(spec)
bpmn_file_name_with_extension = bpmn_file_name
if not bpmn_file_name_with_extension:
@ -87,7 +86,7 @@ class ExampleDataLoader:
)
spec.primary_process_id = references[0].identifier
spec.primary_file_name = filename
ProcessModelService().save_process_model(spec)
ProcessModelService.save_process_model(spec)
finally:
if file:
file.close()

View File

@ -1895,7 +1895,7 @@ class TestProcessApi(BaseTest):
client, process_model_identifier, with_super_admin_user
)
process_model = ProcessModelService.get_process_model(process_model_identifier)
ProcessModelService().update_process_model(
ProcessModelService.update_process_model(
process_model,
{"fault_or_suspend_on_exception": NotificationType.suspend.value},
)
@ -1948,7 +1948,7 @@ class TestProcessApi(BaseTest):
)
process_model = ProcessModelService.get_process_model(process_model_identifier)
ProcessModelService().update_process_model(
ProcessModelService.update_process_model(
process_model,
{"exception_notification_addresses": ["with_super_admin_user@example.com"]},
)

View File

@ -2,6 +2,7 @@
from flask.app import Flask
import os
from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.services.acceptance_test_fixtures import (
load_acceptance_test_fixtures,
@ -20,6 +21,9 @@ def test_start_dates_are_one_hour_apart(app: Flask) -> None:
if not ProcessModelService.is_group(group_identifier):
process_group = ProcessGroup(id=group_identifier, display_name=group_identifier)
ProcessModelService.add_process_group(process_group)
if not ProcessModelService.is_model(process_model_identifier):
process_model = ProcessModelInfo(id=process_model_identifier, display_name=process_model_identifier, description='hey')
ProcessModelService.add_process_model(process_model)
process_instances = load_acceptance_test_fixtures()
assert len(process_instances) > 2

View File

@ -32,7 +32,7 @@ class TestProcessModelService(BaseTest):
primary_process_id = process_model.primary_process_id
assert primary_process_id == "Process_HelloWorld"
ProcessModelService().update_process_model(
ProcessModelService.update_process_model(
process_model, {"display_name": "new_name"}
)