monkeytype ftw w/ burnettk

This commit is contained in:
jasquat 2022-06-21 15:13:02 -04:00
parent 38c0d780c2
commit 7bd2a146da
15 changed files with 126 additions and 112 deletions

View File

@ -1,8 +1,11 @@
"""Conftest."""
import os
import shutil
from typing import Iterator
import pytest
from flask.app import Flask
from spiffworkflow_backend.services.process_model_service import ProcessModelService
# We need to call this before importing spiffworkflow_backend
@ -32,3 +35,14 @@ def app() -> Flask:
)
return app
@pytest.fixture()
def with_bpmn_file_cleanup() -> Iterator[None]:
"""Process_group_resource."""
try:
yield
finally:
process_model_service = ProcessModelService()
if os.path.exists(process_model_service.root_path()):
shutil.rmtree(process_model_service.root_path())

View File

@ -6,7 +6,7 @@ from flask.app import Flask
from werkzeug.utils import ImportStringError
def setup_logger_for_sql_statements(app: Flask):
def setup_logger_for_sql_statements(app: Flask) -> None:
"""Setup_logger_for_sql_statements."""
db_log_file_name = f"log/db_{app.env}.log"
db_handler_log_level = logging.INFO
@ -19,7 +19,7 @@ def setup_logger_for_sql_statements(app: Flask):
db_logger.setLevel(db_logger_log_level)
def setup_database_uri(app: Flask):
def setup_database_uri(app: Flask) -> None:
"""Setup_database_uri."""
if os.environ.get("DATABASE_URI") is None:
if os.environ.get("SPIFF_DATABASE_TYPE") == "sqlite":

View File

@ -12,6 +12,7 @@ from sqlalchemy.orm import deferred
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.data_store import DataStoreModel
from datetime import datetime
class FileModel(SpiffworkflowBaseDBModel):
@ -109,14 +110,14 @@ class File:
process_group_id: Optional[str] = None
archived: bool = False
def __post_init__(self):
def __post_init__(self) -> None:
"""__post_init__."""
self.sort_index = f"{self.type}:{self.name}"
@classmethod
def from_file_system(
cls, file_name, file_type, content_type, last_modified, file_size
):
cls, file_name: str, file_type: FileType, content_type: str, last_modified: datetime, file_size: int
) -> "File":
"""From_file_system."""
instance = cls(
name=file_name,

View File

@ -1,7 +1,7 @@
"""Process_group."""
from dataclasses import dataclass
from dataclasses import field
from typing import Optional
from typing import Dict, Union, Optional
import marshmallow
from marshmallow import post_load
@ -22,7 +22,7 @@ class ProcessGroup:
admin: Optional[bool] = False
process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list)
def __post_init__(self):
def __post_init__(self) -> None:
"""__post_init__."""
self.sort_index = f"{self.display_order}:{self.id}"
@ -51,6 +51,6 @@ class ProcessGroupSchema(Schema):
)
@post_load
def make_process_group(self, data, **kwargs):
def make_process_group(self, data: Dict[str, Union[str, bool, int]], **kwargs) -> ProcessGroup:
"""Make_process_group."""
return ProcessGroup(**data)

View File

@ -15,6 +15,7 @@ from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.task import TaskSchema
from spiffworkflow_backend.models.user import UserModel
from typing import Dict, Union
class NavigationItemSchema(Schema):
@ -87,7 +88,7 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): # type: ignore
status = db.Column(db.Enum(ProcessInstanceStatus))
@property
def serialized(self):
def serialized(self) -> Dict[str, Union[int, str]]:
"""Return object data in serializeable format."""
return {
"id": self.id,
@ -106,17 +107,17 @@ class ProcessInstanceApi:
def __init__(
self,
id,
status,
next_task,
process_model_identifier,
process_group_identifier,
total_tasks,
completed_tasks,
updated_at_in_seconds,
is_review,
title,
):
id: int,
status: ProcessInstanceStatus,
next_task: None,
process_model_identifier: str,
process_group_identifier: str,
total_tasks: int,
completed_tasks: int,
updated_at_in_seconds: int,
is_review: bool,
title: str,
) -> None:
"""__init__."""
self.id = id
self.status = status

View File

@ -1,7 +1,7 @@
"""Process_model."""
from dataclasses import dataclass
from dataclasses import field
from typing import Optional
from typing import Dict, Union, Optional
import marshmallow
from marshmallow import post_load
@ -28,7 +28,7 @@ class ProcessModelInfo:
is_review: Optional[bool] = False
files: Optional[list[str]] = field(default_factory=list)
def __post_init__(self):
def __post_init__(self) -> None:
"""__post_init__."""
self.sort_index = f"{self.process_group_id}:{self.id}"
@ -64,6 +64,6 @@ class ProcessModelInfoSchema(Schema):
files = marshmallow.fields.List(marshmallow.fields.Nested("FileSchema"))
@post_load
def make_spec(self, data, **kwargs):
def make_spec(self, data: Dict[str, Union[str, bool, int]], **kwargs) -> ProcessModelInfo:
"""Make_spec."""
return ProcessModelInfo(**data)

View File

@ -23,6 +23,9 @@ from spiffworkflow_backend.services.process_instance_service import (
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.spec_file_service import SpecFileService
import flask.wrappers
from typing import Dict, List, Optional, Union
from werkzeug.datastructures import FileStorage
# from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
# from SpiffWorkflow.camunda.serializer.task_spec_converters import UserTaskConverter # type: ignore
@ -31,7 +34,7 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService
process_api_blueprint = Blueprint("process_api", __name__)
def process_group_add(body):
def process_group_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.Response:
"""Add_process_group."""
# just so the import is used. oh, and it's imported because spiffworkflow_backend/unit/test_permissions.py
# depends on it, and otherwise flask migrations won't include it in the list of database tables.
@ -47,20 +50,20 @@ def process_group_add(body):
)
def process_group_delete(process_group_id):
def process_group_delete(process_group_id: str) -> flask.wrappers.Response:
"""Process_group_delete."""
ProcessModelService().process_group_delete(process_group_id)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_group_update(process_group_id, body):
def process_group_update(process_group_id: str, body: Dict[str, Union[str, bool, int]]) -> Dict[str, Union[str, bool, int]]:
"""Process Group Update."""
process_group = ProcessGroupSchema().load(body)
ProcessModelService().update_process_group(process_group)
return ProcessGroupSchema().dump(process_group)
def process_groups_list(page=1, per_page=100):
def process_groups_list(page: int=1, per_page: int=100) -> flask.wrappers.Response:
"""Process_groups_list."""
process_groups = sorted(ProcessModelService().get_process_groups())
batch = ProcessModelService().get_batch(process_groups, page, per_page)
@ -79,13 +82,13 @@ def process_groups_list(page=1, per_page=100):
return Response(json.dumps(response_json), status=200, mimetype="application/json")
def process_group_show(process_group_id):
def process_group_show(process_group_id: str) -> Dict[str, Union[List[Dict[str, Union[str, bool, int]]], str, bool, int]]:
"""Process_group_show."""
process_group = ProcessModelService().get_process_group(process_group_id)
return ProcessGroupSchema().dump(process_group)
def process_model_add(body):
def process_model_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.Response:
"""Add_process_model."""
process_model_info = ProcessModelInfoSchema().load(body)
process_model_service = ProcessModelService()
@ -104,20 +107,20 @@ def process_model_add(body):
)
def process_model_delete(process_group_id, process_model_id):
def process_model_delete(process_group_id: str, process_model_id: str) -> flask.wrappers.Response:
"""Process_model_delete."""
ProcessModelService().process_model_delete(process_model_id)
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_model_update(process_group_id, process_model_id, body):
def process_model_update(process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]) -> Dict[str, Union[str, bool, int]]:
"""Process_model_update."""
process_model = ProcessModelInfoSchema().load(body)
ProcessModelService().update_spec(process_model)
return ProcessModelInfoSchema().dump(process_model)
def process_model_show(process_group_id, process_model_id):
def process_model_show(process_group_id: str, process_model_id: str) -> Dict[str, Union[str, List[Dict[str, Optional[Union[str, int, bool]]]], bool, int]]:
"""Process_model_show."""
process_model = ProcessModelService().get_process_model(
process_model_id, group_id=process_group_id
@ -137,7 +140,7 @@ def process_model_show(process_group_id, process_model_id):
return process_model_json
def process_model_list(process_group_id, page=1, per_page=100):
def process_model_list(process_group_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response:
"""Process model list!"""
process_models = sorted(ProcessModelService().get_process_models(process_group_id))
batch = ProcessModelService().get_batch(
@ -159,7 +162,7 @@ def process_model_list(process_group_id, page=1, per_page=100):
return Response(json.dumps(response_json), status=200, mimetype="application/json")
def get_file(process_group_id, process_model_id, file_name):
def get_file(process_group_id: str, process_model_id: str, file_name: str) -> Dict[str, Optional[Union[str, int, bool]]]:
"""Get_file."""
process_model = ProcessModelService().get_process_model(
process_model_id, group_id=process_group_id
@ -181,7 +184,7 @@ def get_file(process_group_id, process_model_id, file_name):
return FileSchema().dump(file)
def process_model_file_update(process_group_id, process_model_id, file_name):
def process_model_file_update(process_group_id: str, process_model_id: str, file_name: str) -> flask.wrappers.Response:
"""Process_model_file_save."""
process_model = ProcessModelService().get_process_model(
process_model_id, group_id=process_group_id
@ -200,7 +203,7 @@ def process_model_file_update(process_group_id, process_model_id, file_name):
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def add_file(process_group_id, process_model_id):
def add_file(process_group_id: str, process_model_id: str) -> flask.wrappers.Response:
"""Add_file."""
process_model_service = ProcessModelService()
process_model = process_model_service.get_process_model(
@ -222,7 +225,7 @@ def add_file(process_group_id, process_model_id):
)
def process_instance_create(process_group_id, process_model_id):
def process_instance_create(process_group_id: str, process_model_id: str) -> flask.wrappers.Response:
"""Create_process_instance."""
process_instance = ProcessInstanceService.create_process_instance(
process_model_id, g.user, process_group_identifier=process_group_id
@ -244,7 +247,7 @@ def process_instance_create(process_group_id, process_model_id):
)
def process_instance_list(process_group_id, process_model_id, page=1, per_page=100):
def process_instance_list(process_group_id: str, process_model_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response:
"""Process_instance_list."""
process_model = ProcessModelService().get_process_model(
process_model_id, group_id=process_group_id
@ -302,7 +305,7 @@ def process_instance_delete(process_group_id, process_model_id, process_instance
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
def process_instance_report(process_group_id, process_model_id, page=1, per_page=100):
def process_instance_report(process_group_id: str, process_model_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response:
"""Process_instance_list."""
process_model = ProcessModelService().get_process_model(
process_model_id, group_id=process_group_id
@ -344,7 +347,7 @@ def process_instance_report(process_group_id, process_model_id, page=1, per_page
return Response(json.dumps(response_json), status=200, mimetype="application/json")
def get_file_from_request():
def get_file_from_request() -> FileStorage:
"""Get_file_from_request."""
request_file = connexion.request.files.get("file")
if not request_file:

View File

@ -1,4 +1,6 @@
"""User."""
from typing import Dict, Optional
from flask import current_app
from flask import g
from flask_bpmn.api.api_error import ApiError
@ -11,7 +13,7 @@ from spiffworkflow_backend.models.user import UserModel
"""
def verify_token(token=None):
def verify_token(token: Optional[str]=None) -> Dict[str, None]:
"""Verify the token for the user (if provided).
If in production environment and token is not provided, gets user from the SSO headers and returns their token.

View File

@ -1,7 +1,7 @@
"""File_system_service."""
import os
from datetime import datetime
from typing import List
from typing import Optional, List
import pytz
from flask import current_app
@ -11,6 +11,7 @@ from spiffworkflow_backend.models.file import CONTENT_TYPES
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.process_model import ProcessModelInfo
from posix import DirEntry
class FileSystemService:
@ -28,7 +29,7 @@ class FileSystemService:
WF_JSON_FILE = "workflow.json"
@staticmethod
def root_path():
def root_path() -> str:
"""Root_path."""
# fixme: allow absolute files
dir_name = current_app.config["BPMN_SPEC_ABSOLUTE_DIR"]
@ -36,7 +37,7 @@ class FileSystemService:
return os.path.join(app_root, "..", dir_name)
@staticmethod
def process_group_path(name: str):
def process_group_path(name: str) -> str:
"""Category_path."""
return os.path.join(FileSystemService.root_path(), name)
@ -48,7 +49,7 @@ class FileSystemService:
)
@staticmethod
def process_group_path_for_spec(spec):
def process_group_path_for_spec(spec: ProcessModelInfo) -> str:
"""Category_path_for_spec."""
if spec.is_master_spec:
return os.path.join(FileSystemService.root_path())
@ -67,7 +68,7 @@ class FileSystemService:
return process_group_path
@staticmethod
def workflow_path(spec: ProcessModelInfo):
def workflow_path(spec: ProcessModelInfo) -> str:
"""Workflow_path."""
if spec.is_master_spec:
return os.path.join(
@ -77,7 +78,7 @@ class FileSystemService:
process_group_path = FileSystemService.process_group_path_for_spec(spec)
return os.path.join(process_group_path, spec.id)
def next_display_order(self, spec):
def next_display_order(self, spec: ProcessModelInfo) -> int:
"""Next_display_order."""
path = self.process_group_path_for_spec(spec)
if os.path.exists(path):
@ -86,14 +87,14 @@ class FileSystemService:
return 0
@staticmethod
def write_file_data_to_system(file_path: str, file_data):
def write_file_data_to_system(file_path: str, file_data: bytes) -> None:
"""Write_file_data_to_system."""
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "wb") as f_handle:
f_handle.write(file_data)
@staticmethod
def get_extension(file_name) -> str:
def get_extension(file_name: str) -> str:
"""Get_extension."""
_, file_extension = os.path.splitext(file_name)
return file_extension.lower().strip()[1:]
@ -125,13 +126,13 @@ class FileSystemService:
return aware_utc_dt
@staticmethod
def file_type(file_name):
def file_type(file_name: str) -> FileType:
"""File_type."""
extension = FileSystemService.get_extension(file_name)
return FileType[extension]
@staticmethod
def _get_files(file_path: str, file_name=None) -> List[File]:
def _get_files(file_path: str, file_name: Optional[str]=None) -> List[File]:
"""Returns an array of File objects at the given path, can be restricted to just one file."""
files = []
items = os.scandir(file_path)
@ -160,7 +161,7 @@ class FileSystemService:
return file
@staticmethod
def to_file_object_from_dir_entry(item: os.DirEntry):
def to_file_object_from_dir_entry(item: os.DirEntry) -> File:
"""To_file_object_from_dir_entry."""
extension = FileSystemService.get_extension(item.name)
try:

View File

@ -1,7 +1,7 @@
"""Process_instance_processor."""
import json
import time
from typing import List
from typing import Any, Dict, Optional, Union, List
from flask import current_app
from flask_bpmn.api.api_error import ApiError
@ -11,7 +11,7 @@ from SpiffWorkflow import Task as SpiffTask # type: ignore
from SpiffWorkflow import TaskState
from SpiffWorkflow import WorkflowException
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore
from SpiffWorkflow.bpmn.PythonScriptEngine import Box, PythonScriptEngine # type: ignore
from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer # type: ignore
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer # type: ignore
from SpiffWorkflow.bpmn.specs.events import CancelEventDefinition # type: ignore
@ -36,6 +36,8 @@ from spiffworkflow_backend.models.user import UserModelSchema
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.spec_file_service import SpecFileService
from spiffworkflow_backend.services.user_service import UserService
from SpiffWorkflow.bpmn.specs.BpmnProcessSpec import BpmnProcessSpec
from SpiffWorkflow.task import Task
# from crc.services.user_file_service import UserFileService
@ -47,11 +49,11 @@ class CustomBpmnScriptEngine(PythonScriptEngine):
scripts directory available for execution.
"""
def evaluate(self, task, expression):
def evaluate(self, task: Task, expression: str) -> str:
"""Evaluate."""
return self._evaluate(expression, task.data, task)
def _evaluate(self, expression, context, task=None, external_methods=None):
def _evaluate(self, expression: str, context: Dict[str, Union[Box, str]], task: Optional[Task]=None, external_methods: None=None) -> str:
"""Evaluate the given expression, within the context of the given task and return the result."""
try:
return super()._evaluate(expression, context, task, {})
@ -62,7 +64,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine):
"'%s', %s" % (expression, str(exception)),
) from exception
def execute(self, task: SpiffTask, script, data):
def execute(self, task: SpiffTask, script: str, data: Dict[str, Dict[str, str]]) -> None:
"""Execute."""
try:
super().execute(task, script, data)
@ -93,8 +95,8 @@ class ProcessInstanceProcessor:
VALIDATION_PROCESS_KEY = "validate_only"
def __init__(
self, process_instance_model: ProcessInstanceModel, validate_only=False
):
self, process_instance_model: ProcessInstanceModel, validate_only: bool=False
) -> None:
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService()
@ -200,7 +202,7 @@ class ProcessInstanceProcessor:
) from ke
@staticmethod
def add_user_info_to_process_instance(bpmn_process_instance):
def add_user_info_to_process_instance(bpmn_process_instance: BpmnWorkflow) -> None:
"""Add_user_info_to_process_instance."""
if UserService.has_user():
current_user = UserService.current_user(allow_admin_impersonate=True)
@ -327,7 +329,7 @@ class ProcessInstanceProcessor:
] = validate_only
return bpmn_process_instance
def save(self):
def save(self) -> None:
"""Saves the current state of this processor to the database."""
self.process_instance_model.bpmn_json = self.serialize()
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
@ -381,13 +383,13 @@ class ProcessInstanceProcessor:
return bpmn_process_instance.last_task.data
@staticmethod
def get_parser():
def get_parser() -> MyCustomParser:
"""Get_parser."""
parser = MyCustomParser()
return parser
@staticmethod
def get_spec(files: List[File], process_model_info: ProcessModelInfo):
def get_spec(files: List[File], process_model_info: ProcessModelInfo) -> BpmnProcessSpec:
"""Returns a SpiffWorkflow specification for the given process_instance spec, using the files provided."""
parser = ProcessInstanceProcessor.get_parser()
@ -424,7 +426,7 @@ class ProcessInstanceProcessor:
return spec
@staticmethod
def status_of(bpmn_process_instance):
def status_of(bpmn_process_instance: BpmnWorkflow) -> ProcessInstanceStatus:
"""Status_of."""
if bpmn_process_instance.is_completed():
return ProcessInstanceStatus.complete
@ -437,11 +439,11 @@ class ProcessInstanceProcessor:
else:
return ProcessInstanceStatus.waiting
def get_status(self):
def get_status(self) -> ProcessInstanceStatus:
"""Get_status."""
return self.status_of(self.bpmn_process_instance)
def do_engine_steps(self, exit_at=None):
def do_engine_steps(self, exit_at: None=None) -> None:
"""Do_engine_steps."""
try:
self.bpmn_process_instance.refresh_waiting_tasks()
@ -464,7 +466,7 @@ class ProcessInstanceProcessor:
except WorkflowTaskExecException as we:
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
def serialize(self):
def serialize(self) -> str:
"""Serialize."""
return self._serializer.serialize_json(self.bpmn_process_instance)
@ -472,7 +474,7 @@ class ProcessInstanceProcessor:
"""Next_user_tasks."""
return self.bpmn_process_instance.get_ready_user_tasks()
def next_task(self):
def next_task(self) -> Task:
"""Returns the next task that should be completed even if there are parallel tasks and multiple options are available.
If the process_instance is complete
@ -541,7 +543,7 @@ class ProcessInstanceProcessor:
next_task = task
return next_task
def completed_user_tasks(self):
def completed_user_tasks(self) -> List[Any]:
"""Completed_user_tasks."""
completed_user_tasks = self.bpmn_process_instance.get_tasks(TaskState.COMPLETED)
completed_user_tasks.reverse()
@ -567,7 +569,7 @@ class ProcessInstanceProcessor:
"""Get_data."""
return self.bpmn_process_instance.data
def get_process_instance_id(self):
def get_process_instance_id(self) -> int:
"""Get_process_instance_id."""
return self.process_instance_model.id
@ -594,7 +596,7 @@ class ProcessInstanceProcessor:
additional_tasks.append(child)
return ready_tasks + additional_tasks
def get_all_user_tasks(self):
def get_all_user_tasks(self) -> List[Union[Task, Any]]:
"""Get_all_user_tasks."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [

View File

@ -1,6 +1,6 @@
"""Process_instance_service."""
import time
from typing import List
from typing import Any, Dict, Optional, List
from flask import current_app
from flask_bpmn.models.db import db
@ -19,6 +19,8 @@ from spiffworkflow_backend.services.process_instance_processor import (
)
from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.user_service import UserService
from SpiffWorkflow.task import Task
from spiffworkflow_backend.models.user import UserModel
class ProcessInstanceService:
@ -28,8 +30,8 @@ class ProcessInstanceService:
@staticmethod
def create_process_instance(
process_model_identifier, user, process_group_identifier=None
):
process_model_identifier: str, user: UserModel, process_group_identifier: Optional[str]=None
) -> ProcessInstanceModel:
"""Get_process_instance_from_spec."""
process_instance_model = ProcessInstanceModel(
status=ProcessInstanceStatus.not_started,
@ -44,8 +46,8 @@ class ProcessInstanceService:
@staticmethod
def processor_to_process_instance_api(
processor: ProcessInstanceProcessor, next_task=None
):
processor: ProcessInstanceProcessor, next_task: None=None
) -> ProcessInstanceApi:
"""Returns an API model representing the state of the current process_instance.
If requested, and possible, next_task is set to the current_task.
@ -119,7 +121,7 @@ class ProcessInstanceService:
ProcessInstanceService.update_navigation(nav_item.children, processor)
@staticmethod
def get_previously_submitted_data(process_instance_id, spiff_task):
def get_previously_submitted_data(process_instance_id: int, spiff_task: Task) -> Dict[Any, Any]:
"""If the user has completed this task previously, find the form data for the last submission."""
query = (
db.session.query(TaskEventModel)

View File

@ -2,7 +2,7 @@
import json
import os
import shutil
from typing import List
from typing import Any, Optional, Union, List
from flask_bpmn.api.api_error import ApiError
@ -27,19 +27,19 @@ class ProcessModelService(FileSystemService):
WF_SCHEMA = ProcessModelInfoSchema()
@staticmethod
def get_batch(items, page=1, per_page=10):
def get_batch(items: List[Union[Any, ProcessGroup, ProcessModelInfo]], page: int=1, per_page: int=10) -> List[Union[Any, ProcessGroup, ProcessModelInfo]]:
"""Get_batch."""
start = (page - 1) * per_page
end = start + per_page
return items[start:end]
def add_spec(self, spec: ProcessModelInfo):
def add_spec(self, spec: ProcessModelInfo) -> None:
"""Add_spec."""
display_order = self.next_display_order(spec)
spec.display_order = display_order
self.update_spec(spec)
def update_spec(self, spec: ProcessModelInfo):
def update_spec(self, spec: ProcessModelInfo) -> None:
"""Update_spec."""
spec_path = self.workflow_path(spec)
if spec.is_master_spec or spec.library or spec.standalone:
@ -49,7 +49,7 @@ class ProcessModelService(FileSystemService):
with open(json_path, "w") as wf_json:
json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4)
def process_model_delete(self, process_model_id: str):
def process_model_delete(self, process_model_id: str) -> None:
"""Delete Procecss Model."""
instances = ProcessInstanceModel.query.filter(
ProcessInstanceModel.process_model_identifier == process_model_id
@ -79,7 +79,7 @@ class ProcessModelService(FileSystemService):
"""Master_spec."""
return self.get_master_spec()
def get_master_spec(self):
def get_master_spec(self) -> None:
"""Get_master_spec."""
path = os.path.join(
FileSystemService.root_path(), FileSystemService.MASTER_SPECIFICATION
@ -87,7 +87,7 @@ class ProcessModelService(FileSystemService):
if os.path.exists(path):
return self.__scan_spec(path, FileSystemService.MASTER_SPECIFICATION)
def get_process_model(self, process_model_id, group_id=None):
def get_process_model(self, process_model_id: str, group_id: Optional[str]=None) -> Optional[ProcessModelInfo]:
"""Get a process model from a model and group id."""
if not os.path.exists(FileSystemService.root_path()):
return # Nothing to scan yet. There are no files.
@ -112,7 +112,7 @@ class ProcessModelService(FileSystemService):
)
return self.__scan_spec(sd.path, sd.name, process_group)
def get_process_models(self, process_group_id=None):
def get_process_models(self, process_group_id: Optional[str]=None) -> List[ProcessModelInfo]:
"""Get process models."""
if process_group_id is None:
process_groups = self.get_process_groups()
@ -143,7 +143,7 @@ class ProcessModelService(FileSystemService):
)
return self.cleanup_workflow_spec_display_order(spec.process_group)
def cleanup_workflow_spec_display_order(self, process_group):
def cleanup_workflow_spec_display_order(self, process_group: ProcessGroup) -> List[Union[Any, ProcessModelInfo]]:
"""Cleanup_workflow_spec_display_order."""
index = 0
if not process_group:
@ -174,7 +174,7 @@ class ProcessModelService(FileSystemService):
return []
return process_group.process_models
def get_process_group(self, process_group_id):
def get_process_group(self, process_group_id: str) -> Optional[ProcessGroup]:
"""Look for a given process_group, and return it."""
if not os.path.exists(FileSystemService.root_path()):
return # Nothing to scan yet. There are no files.
@ -183,13 +183,13 @@ class ProcessModelService(FileSystemService):
if item.is_dir() and item.name == process_group_id:
return self.__scan_process_group(item)
def add_process_group(self, process_group: ProcessGroup):
def add_process_group(self, process_group: ProcessGroup) -> ProcessGroup:
"""Add_process_group."""
display_order = len(self.get_process_groups())
process_group.display_order = display_order
return self.update_process_group(process_group)
def update_process_group(self, process_group: ProcessGroup):
def update_process_group(self, process_group: ProcessGroup) -> ProcessGroup:
"""Update_process_group."""
cat_path = self.process_group_path(process_group.id)
os.makedirs(cat_path, exist_ok=True)
@ -198,7 +198,7 @@ class ProcessModelService(FileSystemService):
json.dump(self.GROUP_SCHEMA.dump(process_group), cat_json, indent=4)
return process_group
def process_group_delete(self, process_group_id: str):
def process_group_delete(self, process_group_id: str) -> None:
"""Delete_process_group."""
path = self.process_group_path(process_group_id)
if os.path.exists(path):
@ -228,7 +228,7 @@ class ProcessModelService(FileSystemService):
index += 1
return process_groups
def cleanup_process_group_display_order(self):
def cleanup_process_group_display_order(self) -> List[Any]:
"""Cleanup_process_group_display_order."""
process_groups = self.get_process_groups() # Returns an ordered list
index = 0

View File

@ -2,12 +2,12 @@
import os
import shutil
from datetime import datetime
from typing import List
from typing import Optional, List
from typing import Union
from flask_bpmn.api.api_error import ApiError
from lxml import etree # type: ignore
from lxml.etree import Element as EtreeElement # type: ignore
from lxml.etree import _Element, Element as EtreeElement # type: ignore
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
from spiffworkflow_backend.models.file import File
@ -27,9 +27,9 @@ class SpecFileService(FileSystemService):
@staticmethod
def get_files(
workflow_spec: ProcessModelInfo,
file_name=None,
include_libraries=False,
extension_filter="",
file_name: Optional[str]=None,
include_libraries: bool=False,
extension_filter: str="",
) -> List[File]:
"""Return all files associated with a workflow specification."""
path = SpecFileService.workflow_path(workflow_spec)
@ -56,7 +56,7 @@ class SpecFileService(FileSystemService):
@staticmethod
def update_file(
workflow_spec: ProcessModelInfo, file_name: str, binary_data
workflow_spec: ProcessModelInfo, file_name: str, binary_data: bytes
) -> File:
"""Update_file."""
SpecFileService.assert_valid_file_name(file_name)
@ -169,7 +169,7 @@ class SpecFileService(FileSystemService):
)
@staticmethod
def has_swimlane(et_root: EtreeElement) -> bool:
def has_swimlane(et_root: _Element) -> bool:
"""Look through XML and determine if there are any lanes present that have a label."""
elements = et_root.xpath(
"//bpmn:lane",
@ -182,7 +182,7 @@ class SpecFileService(FileSystemService):
return retval
@staticmethod
def get_process_id(et_root: EtreeElement) -> str:
def get_process_id(et_root: _Element) -> str:
"""Get_process_id."""
process_elements = []
for child in et_root:

View File

@ -8,7 +8,6 @@ from typing import Iterator
from typing import Optional
from typing import Union
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from flask_bpmn.models.db import db
@ -26,17 +25,6 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
from spiffworkflow_backend.services.process_model_service import ProcessModelService
@pytest.fixture()
def with_bpmn_file_cleanup() -> Iterator[None]:
"""Process_group_resource."""
try:
yield
finally:
process_model_service = ProcessModelService()
if os.path.exists(process_model_service.root_path()):
shutil.rmtree(process_model_service.root_path())
# phase 1: req_id: 7.1 Deploy process
def test_process_model_add(
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None

View File

@ -5,7 +5,7 @@ from spiffworkflow_backend.models.process_group import ProcessGroup
from spiffworkflow_backend.services.process_model_service import ProcessModelService
def test_there_is_at_least_one_group_after_we_create_one(app: Flask) -> None:
def test_there_is_at_least_one_group_after_we_create_one(app: Flask, with_bpmn_file_cleanup: None) -> None:
"""Test_there_is_at_least_one_group_after_we_create_one."""
process_model_service = ProcessModelService()
process_group = ProcessGroup(id="hey", display_name="sure")