some more type fixes w/ burnettk

This commit is contained in:
jasquat 2022-06-22 16:52:50 -04:00
parent 2a43b5ae91
commit 388cee7018
11 changed files with 97 additions and 212 deletions

View File

@ -1,3 +1,5 @@
from __future__ import with_statement
import logging
from logging.config import fileConfig

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: acad9ea3a861
Revision ID: 26094f78f273
Revises:
Create Date: 2022-06-21 12:45:20.062981
Create Date: 2022-06-22 16:42:29.228683
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'acad9ea3a861'
revision = '26094f78f273'
down_revision = None
branch_labels = None
depends_on = None
@ -54,11 +54,12 @@ def upgrade():
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_model_identifier', sa.String(length=50), nullable=False),
sa.Column('process_group_identifier', sa.String(length=50), nullable=False),
sa.Column('process_initiator_id', sa.Integer(), nullable=False),
sa.Column('bpmn_json', sa.JSON(), nullable=True),
sa.Column('start_in_seconds', sa.Integer(), nullable=True),
sa.Column('end_in_seconds', sa.Integer(), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('process_initiator_id', sa.Integer(), nullable=False),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.Enum('not_started', 'user_input_required', 'waiting', 'complete', 'erroring', name='processinstancestatus'), nullable=True),
sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')

16
poetry.lock generated
View File

@ -356,7 +356,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[[package]]
name = "connexion"
version = "2.13.1"
version = "2.14.0"
description = "Connexion - API first applications with OpenAPI/Swagger and Flask"
category = "main"
optional = false
@ -598,7 +598,7 @@ description = "Flask Bpmn"
category = "main"
optional = false
python-versions = "^3.7"
develop = false
develop = true
[package.dependencies]
click = "^8.0.1"
@ -616,10 +616,8 @@ spiffworkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "ma
werkzeug = "*"
[package.source]
type = "git"
url = "https://github.com/sartography/flask-bpmn"
reference = "main"
resolved_reference = "444daed6de1e2b5de16a96970159c952c5523650"
type = "directory"
url = "../flask-bpmn"
[[package]]
name = "flask-cors"
@ -2033,7 +2031,7 @@ testing = ["pytest (>=6)", "pytest-checkdocs (>=2.4)", "pytest-flake8", "pytest-
[metadata]
lock-version = "1.1"
python-versions = "^3.9"
content-hash = "b4302cbeba2b6c5a3777a4b3eb4549fbc6e910ab832b2bf0d573110564d67690"
content-hash = "2f02aaeb08296c0990e2fa48b0bb5bb2775268552fd11c149bfc85e87ce09c97"
[metadata.files]
alabaster = [
@ -2222,8 +2220,8 @@ configparser = [
{file = "configparser-5.2.0.tar.gz", hash = "sha256:1b35798fdf1713f1c3139016cfcbc461f09edbf099d1fb658d4b7479fcaa3daa"},
]
connexion = [
{file = "connexion-2.13.1-py2.py3-none-any.whl", hash = "sha256:fb9a8c7a60fdecac45c913c1373948b28d8a55328472e08b5132fba36da36524"},
{file = "connexion-2.13.1.tar.gz", hash = "sha256:80b534800b408d184fe6d36e755edefdd657e8f11756b5806d6bcf7213380d0d"},
{file = "connexion-2.14.0-py2.py3-none-any.whl", hash = "sha256:4e50c1b0b6d287e20830d053c8de09a73bead5ac0760200ade074364c7362ab6"},
{file = "connexion-2.14.0.tar.gz", hash = "sha256:ed6f9c97ca5281257935c5530570b2a2394a689ece1b171c18d855cf751adbb4"},
]
coverage = [
{file = "coverage-6.4.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f1d5aa2703e1dab4ae6cf416eb0095304f49d004c39e9db1d86f57924f43006b"},

View File

@ -30,9 +30,9 @@ werkzeug = "*"
spiffworkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
sentry-sdk = "0.14.4"
sphinx-autoapi = "^1.8.4"
# flask-bpmn = {develop = true, path = "/home/jason/projects/github/sartography/flask-bpmn"}
flask-bpmn = {develop = true, path = "/home/jason/projects/github/sartography/flask-bpmn"}
# flask-bpmn = {develop = true, path = "/Users/kevin/projects/github/sartography/flask-bpmn"}
flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}
# flask-bpmn = {git = "https://github.com/sartography/flask-bpmn", rev = "main"}
mysql-connector-python = "^8.0.29"
pytest-flask = "^1.2.0"
pytest-flask-sqlalchemy = "^1.1.0"

View File

@ -24,7 +24,7 @@ class ProcessGroup:
display_name: str
display_order: Optional[int] = 0
admin: Optional[bool] = False
process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list[ProcessModelInfo])
process_models: list[ProcessModelInfo] = field(default_factory=list[ProcessModelInfo])
def __post_init__(self) -> None:
"""__post_init__."""

View File

@ -80,19 +80,21 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
"""ProcessInstanceModel."""
__tablename__ = "process_instance"
id = db.Column(db.Integer, primary_key=True) # type: ignore
process_model_identifier = db.Column(db.String(50), nullable=False, index=True) # type: ignore
process_group_identifier = db.Column(db.String(50), nullable=False, index=True) # type: ignore
bpmn_json = deferred(db.Column(db.JSON)) # type: ignore
start_in_seconds = db.Column(db.Integer) # type: ignore
end_in_seconds = db.Column(db.Integer) # type: ignore
updated_at_in_seconds = db.Column(db.Integer) # type: ignore
process_initiator_id = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
id: int = db.Column(db.Integer, primary_key=True) # type: ignore
process_model_identifier: str = db.Column(db.String(50), nullable=False, index=True) # type: ignore
process_group_identifier: str = db.Column(db.String(50), nullable=False, index=True) # type: ignore
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
process_initiator = relationship("UserModel")
status = db.Column(db.Enum(ProcessInstanceStatus)) # type: ignore
bpmn_json: Optional[str] = deferred(db.Column(db.JSON)) # type: ignore
start_in_seconds: Optional[int] = db.Column(db.Integer) # type: ignore
end_in_seconds: Optional[int] = db.Column(db.Integer) # type: ignore
updated_at_in_seconds: int = db.Column(db.Integer) # type: ignore
created_at_in_seconds: int = db.Column(db.Integer) # type: ignore
status: ProcessInstanceStatus = db.Column(db.Enum(ProcessInstanceStatus)) # type: ignore
@property
def serialized(self) -> Dict[str, Union[int, str]]:
def serialized(self) -> Dict[str, Union[int, str, None]]:
"""Return object data in serializeable format."""
return {
"id": self.id,

View File

@ -45,7 +45,7 @@ class ProcessModelInfo:
def __post_init__(self) -> None:
"""__post_init__."""
self.sort_index = f"{self.process_group_id}:{self.id}"
self.sort_index = f"{self.display_order}:{self.process_group_id}:{self.id}"
def __eq__(self, other: Any) -> bool:
"""__eq__."""

View File

@ -47,14 +47,14 @@ from spiffworkflow_backend.services.user_service import UserService
# from crc.services.user_file_service import UserFileService
class CustomBpmnScriptEngine(PythonScriptEngine):
class CustomBpmnScriptEngine(PythonScriptEngine): # type: ignore
"""This is a custom script processor that can be easily injected into Spiff Workflow.
It will execute python code read in from the bpmn. It will also make any scripts in the
scripts directory available for execution.
"""
def evaluate(self, task: Task, expression: str) -> str:
def evaluate(self, task: Task, expression: str) -> Any:
"""Evaluate."""
return self._evaluate(expression, task.data, task)
@ -63,8 +63,8 @@ class CustomBpmnScriptEngine(PythonScriptEngine):
expression: str,
context: Dict[str, Union[Box, str]],
task: Optional[Task] = None,
external_methods: None = None,
) -> str:
_external_methods: None = None,
) -> Any:
"""Evaluate the given expression, within the context of the given task and return the result."""
try:
return super()._evaluate(expression, context, task, {})
@ -87,7 +87,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine):
raise WorkflowTaskExecException(task, f" {script}, {e}", e) from e
class MyCustomParser(BpmnDmnParser):
class MyCustomParser(BpmnDmnParser): # type: ignore
"""A BPMN and DMN parser that can also parse Camunda forms."""
OVERRIDE_PARSER_CLASSES = BpmnDmnParser.OVERRIDE_PARSER_CLASSES
@ -153,7 +153,7 @@ class ProcessInstanceProcessor:
message += f"\n Spec Size: {spec_size}"
current_app.logger.warning(message)
def check_sub_specs(test_spec, indent=0, show_all=False):
def check_sub_specs(test_spec: dict, indent: int = 0, show_all: bool = False) -> None:
"""Check_sub_specs."""
for my_spec_name in test_spec["task_specs"]:
my_spec = test_spec["task_specs"][my_spec_name]
@ -225,7 +225,7 @@ class ProcessInstanceProcessor:
task.data["current_user"] = current_user_data
@staticmethod
def reset(process_instance_model, clear_data=False):
def reset(process_instance_model: ProcessInstanceModel, clear_data: bool = False) -> None:
"""Resets the process_instance back to an unstarted state - where nothing has happened yet.
If clear_data is set to false, then the information
@ -275,46 +275,12 @@ class ProcessInstanceProcessor:
# UserFileService().delete_file(file.id)
db.session.commit()
@staticmethod
def __get_bpmn_workflow(
process_instance_model: ProcessInstanceModel,
spec: WorkflowSpec = None,
validate_only=False,
):
"""__get_bpmn_workflow."""
if process_instance_model.bpmn_workflow_json:
version = ProcessInstanceProcessor._serializer.get_version(
process_instance_model.bpmn_workflow_json
)
if version == ProcessInstanceProcessor.SERIALIZER_VERSION:
bpmn_workflow = ProcessInstanceProcessor._serializer.deserialize_json(
process_instance_model.bpmn_workflow_json
)
else:
bpmn_workflow = (
ProcessInstanceProcessor._old_serializer.deserialize_workflow(
process_instance_model.bpmn_workflow_json, workflow_spec=spec
)
)
bpmn_workflow.script_engine = ProcessInstanceProcessor._script_engine
else:
bpmn_workflow = BpmnWorkflow(
spec, script_engine=ProcessInstanceProcessor._script_engine
)
bpmn_workflow.data[
ProcessInstanceProcessor.PROCESS_INSTANCE_ID_KEY
] = process_instance_model.study_id
bpmn_workflow.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
] = validate_only
return bpmn_workflow
@staticmethod
def __get_bpmn_process_instance(
process_instance_model: ProcessInstanceModel,
spec: WorkflowSpec = None,
validate_only=False,
):
validate_only: bool = False,
) -> BpmnWorkflow:
"""__get_bpmn_process_instance."""
if process_instance_model.bpmn_json:
version = ProcessInstanceProcessor._serializer.get_version(
@ -363,38 +329,6 @@ class ProcessInstanceProcessor:
db.session.add(self.process_instance_model)
db.session.commit()
@staticmethod
def run_master_spec(process_model):
"""Executes a BPMN specification for the given process_model, without recording any information to the database.
Useful for running the master specification, which should not persist.
"""
spec_files = SpecFileService().get_files(process_model, include_libraries=True)
spec = ProcessInstanceProcessor.get_spec(spec_files, process_model)
try:
bpmn_process_instance = BpmnWorkflow(
spec, script_engine=ProcessInstanceProcessor._script_engine
)
bpmn_process_instance.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
] = False
ProcessInstanceProcessor.add_user_info_to_process_instance(
bpmn_process_instance
)
bpmn_process_instance.do_engine_steps()
except WorkflowException as we:
raise ApiError.from_task_spec(
"error_running_master_spec", str(we), we.sender
) from we
if not bpmn_process_instance.is_completed():
raise ApiError(
"master_spec_not_automatic",
"The master spec should only contain fully automated tasks, it failed to complete.",
)
return bpmn_process_instance.last_task.data
@staticmethod
def get_parser() -> MyCustomParser:
"""Get_parser."""
@ -466,12 +400,12 @@ class ProcessInstanceProcessor:
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
def cancel_notify(self):
def cancel_notify(self) -> None:
"""Cancel_notify."""
self.__cancel_notify(self.bpmn_process_instance)
@staticmethod
def __cancel_notify(bpmn_process_instance):
def __cancel_notify(bpmn_process_instance: BpmnWorkflow) -> None:
"""__cancel_notify."""
try:
# A little hackly, but make the bpmn_process_instance catch a cancel event.
@ -483,11 +417,11 @@ class ProcessInstanceProcessor:
def serialize(self) -> str:
"""Serialize."""
return self._serializer.serialize_json(self.bpmn_process_instance)
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore
def next_user_tasks(self):
def next_user_tasks(self) -> list[Task]:
"""Next_user_tasks."""
return self.bpmn_process_instance.get_ready_user_tasks()
return self.bpmn_process_instance.get_ready_user_tasks() # type: ignore
def next_task(self) -> Task:
"""Returns the next task that should be completed even if there are parallel tasks and multiple options are available.
@ -558,50 +492,37 @@ class ProcessInstanceProcessor:
next_task = task
return next_task
def completed_user_tasks(self) -> List[Any]:
def completed_user_tasks(self) -> List[Task]:
"""Completed_user_tasks."""
completed_user_tasks = self.bpmn_process_instance.get_tasks(TaskState.COMPLETED)
completed_user_tasks.reverse()
completed_user_tasks = list(
user_tasks = self.bpmn_process_instance.get_tasks(TaskState.COMPLETED)
user_tasks.reverse()
user_tasks = list(
filter(
lambda task: not self.bpmn_process_instance._is_engine_task(
task.task_spec
),
completed_user_tasks,
user_tasks,
)
)
return completed_user_tasks
return user_tasks # type: ignore
def previous_task(self):
"""Previous_task."""
return None
def complete_task(self, task):
def complete_task(self, task: Task) -> None:
"""Complete_task."""
self.bpmn_process_instance.complete_task_from_id(task.id)
def get_data(self) -> dict[str, str]:
"""Get_data."""
return self.bpmn_process_instance.data
return self.bpmn_process_instance.data # type: ignore
def get_process_instance_id(self) -> int:
"""Get_process_instance_id."""
return self.process_instance_model.id
@staticmethod
def find_top_level_process_instance(task):
"""Find_top_level_process_instance."""
# Find the top level process_instance, as this is where the parent id etc... are stored.
process_instance = task.process_instance
while process_instance.outer_process_instance != process_instance:
process_instance = process_instance.outer_process_instance
return process_instance
def get_ready_user_tasks(self):
def get_ready_user_tasks(self) -> list[Task]:
"""Get_ready_user_tasks."""
return self.bpmn_process_instance.get_ready_user_tasks()
return self.bpmn_process_instance.get_ready_user_tasks() # type: ignore
def get_current_user_tasks(self):
def get_current_user_tasks(self) -> list[Task]:
"""Return a list of all user tasks that are READY or COMPLETE and are parallel to the READY Task."""
ready_tasks = self.bpmn_process_instance.get_ready_user_tasks()
additional_tasks = []
@ -609,9 +530,9 @@ class ProcessInstanceProcessor:
for child in ready_tasks[0].parent.children:
if child.state == TaskState.COMPLETED:
additional_tasks.append(child)
return ready_tasks + additional_tasks
return ready_tasks + additional_tasks # type: ignore
def get_all_user_tasks(self) -> List[Union[Task, Any]]:
def get_all_user_tasks(self) -> List[Task]:
"""Get_all_user_tasks."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [
@ -620,7 +541,7 @@ class ProcessInstanceProcessor:
if not self.bpmn_process_instance._is_engine_task(t.task_spec)
]
def get_all_completed_tasks(self):
def get_all_completed_tasks(self) -> list[Task]:
"""Get_all_completed_tasks."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [
@ -630,13 +551,14 @@ class ProcessInstanceProcessor:
and t.state in [TaskState.COMPLETED, TaskState.CANCELLED]
]
def get_nav_item(self, task):
def get_nav_item(self, task: Task) -> Any:
"""Get_nav_item."""
for nav_item in self.bpmn_process_instance.get_nav_list():
if nav_item["task_id"] == task.id:
return nav_item
return None
def find_spec_and_field(self, spec_name, field_id):
def find_spec_and_field(self, spec_name: str, field_id: Union[str, int]) -> None:
"""Tracks down a form field by name in the process_instance spec(s), Returns a tuple of the task, and form."""
process_instances = [self.bpmn_process_instance]
for task in self.bpmn_process_instance.get_ready_user_tasks():

View File

@ -86,28 +86,31 @@ class ProcessModelService(FileSystemService):
"""Master_spec."""
return self.get_master_spec()
def get_master_spec(self) -> None:
def get_master_spec(self) -> Optional[ProcessModelInfo]:
"""Get_master_spec."""
path = os.path.join(
FileSystemService.root_path(), FileSystemService.MASTER_SPECIFICATION
)
if os.path.exists(path):
return self.__scan_spec(path, FileSystemService.MASTER_SPECIFICATION)
return None
def get_process_model(
self, process_model_id: str, group_id: Optional[str] = None
) -> Optional[ProcessModelInfo]:
"""Get a process model from a model and group id."""
if not os.path.exists(FileSystemService.root_path()):
return # Nothing to scan yet. There are no files.
return None # Nothing to scan yet. There are no files.
master_spec = self.get_master_spec()
if master_spec and master_spec.id == process_model_id:
return master_spec
if group_id is not None:
for process_model in self.get_process_group(group_id).process_models:
if process_model_id == process_model.id:
return process_model
process_group = self.get_process_group(group_id)
if process_group is not None:
for process_model in process_group.process_models:
if process_model_id == process_model.id:
return process_model
with os.scandir(FileSystemService.root_path()) as process_group_dirs:
for item in process_group_dirs:
process_group_dir = item
@ -120,40 +123,26 @@ class ProcessModelService(FileSystemService):
process_group_dir
)
return self.__scan_spec(sd.path, sd.name, process_group)
return None
def get_process_models(
self, process_group_id: Optional[str] = None
) -> List[ProcessModelInfo]:
"""Get process models."""
process_groups = []
if process_group_id is None:
process_groups = self.get_process_groups()
else:
process_group = self.get_process_group(process_group_id)
process_groups = [
process_group,
]
if process_group is not None:
process_groups.append(process_group)
process_models = []
for process_group in process_groups:
process_models.extend(process_group.process_models)
return process_models
def reorder_spec(self, spec: ProcessModelInfo, direction):
"""Reorder_spec."""
process_models = spec.process_group.process_models
process_models.sort(key=lambda w: w.display_order)
index = process_models.index(spec)
if direction == "up" and index > 0:
process_models[index - 1], process_models[index] = (
process_models[index],
process_models[index - 1],
)
if direction == "down" and index < len(process_models) - 1:
process_models[index + 1], process_models[index] = (
process_models[index],
process_models[index + 1],
)
return self.cleanup_workflow_spec_display_order(spec.process_group)
def cleanup_workflow_spec_display_order(
self, process_group: ProcessGroup
) -> List[Union[Any, ProcessModelInfo]]:
@ -169,9 +158,9 @@ class ProcessModelService(FileSystemService):
def get_process_groups(self) -> List[ProcessGroup]:
"""Returns the process_groups as a list in display order."""
cat_list = self.__scan_process_groups()
cat_list.sort(key=lambda w: w.display_order)
return cat_list
process_groups = self.__scan_process_groups()
process_groups.sort()
return process_groups
def get_libraries(self) -> List[ProcessModelInfo]:
"""Get_libraries."""
@ -190,11 +179,12 @@ class ProcessModelService(FileSystemService):
def get_process_group(self, process_group_id: str) -> Optional[ProcessGroup]:
"""Look for a given process_group, and return it."""
if not os.path.exists(FileSystemService.root_path()):
return # Nothing to scan yet. There are no files.
return None # Nothing to scan yet. There are no files.
with os.scandir(FileSystemService.root_path()) as directory_items:
for item in directory_items:
if item.is_dir() and item.name == process_group_id:
return self.__scan_process_group(item)
return None
def add_process_group(self, process_group: ProcessGroup) -> ProcessGroup:
"""Add_process_group."""
@ -218,29 +208,6 @@ class ProcessModelService(FileSystemService):
shutil.rmtree(path)
self.cleanup_process_group_display_order()
def reorder_workflow_spec_process_group(
self, process_group: ProcessGroup, direction
):
"""Reorder_workflow_spec_process_group."""
process_groups = self.get_process_groups() # Returns an ordered list
index = process_groups.index(process_group)
if direction == "up" and index > 0:
process_groups[index - 1], process_groups[index] = (
process_groups[index],
process_groups[index - 1],
)
if direction == "down" and index < len(process_groups) - 1:
process_groups[index + 1], process_groups[index] = (
process_groups[index],
process_groups[index + 1],
)
index = 0
for process_group in process_groups:
process_group.display_order = index
self.update_process_group(process_group)
index += 1
return process_groups
def cleanup_process_group_display_order(self) -> List[Any]:
"""Cleanup_process_group_display_order."""
process_groups = self.get_process_groups() # Returns an ordered list
@ -251,7 +218,7 @@ class ProcessModelService(FileSystemService):
index += 1
return process_groups
def __scan_process_groups(self):
def __scan_process_groups(self) -> list[ProcessGroup]:
"""__scan_process_groups."""
if not os.path.exists(FileSystemService.root_path()):
return [] # Nothing to scan yet. There are no files.
@ -271,13 +238,18 @@ class ProcessModelService(FileSystemService):
process_groups.append(self.__scan_process_group(item))
return process_groups
def __scan_process_group(self, dir_item: os.DirEntry):
def __scan_process_group(self, dir_item: os.DirEntry) -> ProcessGroup:
"""Reads the process_group.json file, and any workflow directories."""
cat_path = os.path.join(dir_item.path, self.CAT_JSON_FILE)
if os.path.exists(cat_path):
with open(cat_path) as cat_json:
data = json.load(cat_json)
process_group = self.GROUP_SCHEMA.load(data)
process_group = ProcessGroup(**data)
if process_group is None:
raise ApiError(
code="process_group_could_not_be_loaded_from_disk",
message=f"We could not load the process_group from disk from: {dir_item}",
)
else:
process_group = ProcessGroup(
id=dir_item.name,
@ -296,19 +268,9 @@ class ProcessModelService(FileSystemService):
item.path, item.name, process_group=process_group
)
)
process_group.process_models.sort(key=lambda w: w.display_order)
process_group.process_models.sort()
return process_group
@staticmethod
def _get_workflow_metas(study_id):
"""_get_workflow_metas."""
# Add in the Workflows for each process_group
# Fixme: moved fro the Study Service
workflow_metas = []
# for workflow in workflow_models:
# workflow_metas.append(WorkflowMetadata.from_workflow(workflow))
return workflow_metas
def __scan_spec(self, path: str, name: str, process_group: Optional[ProcessGroup] = None) -> ProcessModelInfo:
"""__scan_spec."""
spec_path = os.path.join(path, self.WF_JSON_FILE)
@ -317,7 +279,7 @@ class ProcessModelService(FileSystemService):
if os.path.exists(spec_path):
with open(spec_path) as wf_json:
data = json.load(wf_json)
spec = self.WF_SCHEMA.load(data)
spec = ProcessModelInfo(**data)
if spec is None:
raise ApiError(
code="process_model_could_not_be_loaded_from_disk",

View File

@ -50,7 +50,7 @@ class SpecFileService(FileSystemService):
@staticmethod
def add_file(
workflow_spec: ProcessModelInfo, file_name: str, binary_data: bytearray
workflow_spec: ProcessModelInfo, file_name: str, binary_data: bytes
) -> File:
"""Add_file."""
# Same as update

View File

@ -17,7 +17,6 @@ class ExampleDataLoader:
id: str,
display_name: str = "",
description: str = "",
filepath: None = None,
master_spec: bool = False,
process_group_id: str = "",
display_order: int = 0,
@ -30,7 +29,6 @@ class ExampleDataLoader:
further assumes that the [id].bpmn is the primary file for the process model.
returns an array of data models to be added to the database.
"""
global file
spec = ProcessModelInfo(
id=id,
display_name=display_name,
@ -48,19 +46,19 @@ class ExampleDataLoader:
workflow_spec_service = ProcessModelService()
workflow_spec_service.add_spec(spec)
if not filepath and not from_tests:
filepath = os.path.join(current_app.root_path, "static", "bpmn", id, "*.*")
if not filepath and from_tests:
filepath = os.path.join(
file_glob = ""
if from_tests:
file_glob = os.path.join(
current_app.instance_path, "..", "..", "tests", "data", id, "*.*"
)
else:
file_glob = os.path.join(current_app.root_path, "static", "bpmn", id, "*.*")
files = glob.glob(filepath)
files = glob.glob(file_glob)
for file_path in files:
if os.path.isdir(file_path):
continue # Don't try to process sub directories
noise, file_extension = os.path.splitext(file_path)
filename = os.path.basename(file_path)
is_primary = filename.lower() == id + ".bpmn"
file = None