return process models back with process group w/ burnettk

This commit is contained in:
jasquat 2022-06-03 13:40:11 -04:00
parent 7b3f3410d7
commit 786754a2a0
5 changed files with 41 additions and 46 deletions

View File

@ -1,29 +1,26 @@
"""Process_group."""
from marshmallow import post_load
from marshmallow import Schema
# class ProcessGroupModel(db.Model): # type: ignore
# """ProcessGroupMode."""
#
# __tablename__ = "process_group"
# id = db.Column(db.Integer, primary_key=True)
# name = db.Column(db.String(50))
import marshmallow
class ProcessGroup:
"""ProcessGroup."""
def __init__(self, id, display_name, display_order=0, admin=False):
def __init__(self, id, display_name, display_order=0, admin=False, process_models=None):
"""__init__."""
self.id = id # A unique string name, lower case, under scores (ie, 'my_group')
self.display_name = display_name
self.display_order = display_order
self.admin = admin
self.workflows = [] # For storing Workflow Metadata
self.specs = [] # For the list of specifications associated with a group
self.meta = None # For storing group metadata
if process_models is None:
process_models = []
self.process_models = process_models # For the list of specifications associated with a group
def __eq__(self, other):
"""__eq__."""
if not isinstance(other, ProcessGroup):
@ -40,7 +37,11 @@ class ProcessGroupSchema(Schema):
"""Meta."""
model = ProcessGroup
fields = ["id", "display_name", "display_order", "admin"]
fields = ["id", "display_name", "display_order", "admin", "process_models"]
process_models = marshmallow.fields.List(
marshmallow.fields.Nested("ProcessModelInfoSchema", dump_only=True, required=False)
)
@post_load
def make_cat(self, data, **kwargs):

View File

@ -1,5 +1,6 @@
{% extends "layout.html" %} {% block title %}Process Group: {{ process_group.id
}}{% endblock %} {% block content %}
{% extends "layout.html" %}
{% block title %}Process Group: {{ process_group.id }}{% endblock %}
{% block content %}
<button
type="button"
onclick="window.location.href='{{ url_for( 'admin.process_groups_list') }}';"
@ -8,8 +9,8 @@
</button>
<table>
<tbody>
{# here we iterate over every item in our list#} {% for process_model in
process_group.specs %}
{# here we iterate over every item in our list#}
{% for process_model in process_group.specs %}
<tr>
<td>
<a

View File

@ -18,7 +18,7 @@ class ProcessModelService(FileSystemService):
as it would have been stored in the database. This is specific to Workflow Specifications, and
Workflow Specification process_groups.
We do this, so we can easily drop in a new configuration on the file system, and change all
the workflow specs at once, or manage those file in a git repository. """
the workflow process_models at once, or manage those file in a git repository. """
GROUP_SCHEMA = ProcessGroupSchema()
WF_SCHEMA = ProcessModelInfoSchema()
@ -93,20 +93,20 @@ class ProcessModelService(FileSystemService):
def get_specs(self):
"""Get_specs."""
process_groups = self.get_process_groups()
specs = []
process_models = []
for cat in process_groups:
specs.extend(cat.specs)
return specs
process_models.extend(cat.process_models)
return process_models
def reorder_spec(self, spec: ProcessModelInfo, direction):
"""Reorder_spec."""
specs = spec.process_group.specs
specs.sort(key=lambda w: w.display_order)
index = specs.index(spec)
process_models = spec.process_group.process_models
process_models.sort(key=lambda w: w.display_order)
index = process_models.index(spec)
if direction == "up" and index > 0:
specs[index - 1], specs[index] = specs[index], specs[index - 1]
if direction == "down" and index < len(specs) - 1:
specs[index + 1], specs[index] = specs[index], specs[index + 1]
process_models[index - 1], process_models[index] = process_models[index], process_models[index - 1]
if direction == "down" and index < len(process_models) - 1:
process_models[index + 1], process_models[index] = process_models[index], process_models[index + 1]
return self.cleanup_workflow_spec_display_order(spec.process_group)
def cleanup_workflow_spec_display_order(self, process_group):
@ -114,11 +114,11 @@ class ProcessModelService(FileSystemService):
index = 0
if not process_group:
return []
for workflow in process_group.specs:
for workflow in process_group.process_models:
workflow.display_order = index
self.update_spec(workflow)
index += 1
return process_group.specs
return process_group.process_models
def get_process_groups(self) -> List[ProcessGroup]:
"""Returns the process_groups as a list in display order."""
@ -131,14 +131,14 @@ class ProcessModelService(FileSystemService):
cat = self.get_process_group(self.LIBRARY_SPECS)
if not cat:
return []
return cat.specs
return cat.process_models
def get_standalones(self) -> List[ProcessModelInfo]:
"""Get_standalones."""
cat = self.get_process_group(self.STAND_ALONE_SPECS)
if not cat:
return []
return cat.specs
return cat.process_models
def get_process_group(self, process_group_id):
"""Look for a given process_group, and return it."""
@ -233,13 +233,13 @@ class ProcessModelService(FileSystemService):
with open(cat_path, "w") as wf_json:
json.dump(self.GROUP_SCHEMA.dump(cat), wf_json, indent=4)
with os.scandir(dir_item.path) as workflow_dirs:
cat.specs = []
cat.process_models = []
for item in workflow_dirs:
if item.is_dir():
cat.specs.append(
cat.process_models.append(
self.__scan_spec(item.path, item.name, process_group=cat)
)
cat.specs.sort(key=lambda w: w.display_order)
cat.process_models.sort(key=lambda w: w.display_order)
return cat
@staticmethod

View File

@ -20,16 +20,6 @@ def find_or_create_user(username: str = "test_user1") -> Any:
return user
def find_or_create_process_group(name: str = "test_group1") -> Any:
process_group = ProcessGroupModel.query.filter_by(name=name).first()
if process_group is None:
process_group = ProcessGroupModel(name=name)
db.session.add(process_group)
db.session.commit()
return process_group
def assure_process_group_exists(process_group_id=None):
"""Assure_process_group_exists."""
process_group = None
@ -62,7 +52,7 @@ def load_test_spec(
process_group = None
workflow_spec_service = ProcessModelService()
if process_group_id is None:
process_group_id = dir_name
process_group_id = "test_process_group_id"
if not master_spec and not library:
process_group = assure_process_group_exists(process_group_id)
process_group_id = process_group.id

View File

@ -83,12 +83,15 @@ def test_get_process_groups_when_there_are_some(app, client: FlaskClient, with_b
def test_get_process_group_when_found(app, client: FlaskClient, with_bpmn_file_cleanup):
user = find_or_create_user()
load_test_spec(app, "hello_world")
test_process_group_id = "group_id1"
process_model_dir_name = "hello_world"
load_test_spec(app, process_model_dir_name, process_group_id=test_process_group_id)
rv = client.get(
"/v1.0/process-group/hello_world", headers=logged_in_headers(user)
f"/v1.0/process-group/{test_process_group_id}", headers=logged_in_headers(user)
)
assert rv.status_code == 200
assert rv.json["id"] == "hello_world"
assert rv.json["id"] == test_process_group_id
assert rv.json["process_models"][0]["id"] == process_model_dir_name
def create_process_model(app, client: FlaskClient):