precommit w/ burnettk
This commit is contained in:
parent
7bd2a146da
commit
1bc2ca6179
|
@ -5,6 +5,7 @@ from typing import Iterator
|
|||
|
||||
import pytest
|
||||
from flask.app import Flask
|
||||
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
import enum
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
|
@ -12,7 +13,6 @@ from sqlalchemy.orm import deferred
|
|||
from sqlalchemy.orm import relationship
|
||||
|
||||
from spiffworkflow_backend.models.data_store import DataStoreModel
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class FileModel(SpiffworkflowBaseDBModel):
|
||||
|
@ -116,7 +116,12 @@ class File:
|
|||
|
||||
@classmethod
|
||||
def from_file_system(
|
||||
cls, file_name: str, file_type: FileType, content_type: str, last_modified: datetime, file_size: int
|
||||
cls,
|
||||
file_name: str,
|
||||
file_type: FileType,
|
||||
content_type: str,
|
||||
last_modified: datetime,
|
||||
file_size: int,
|
||||
) -> "File":
|
||||
"""From_file_system."""
|
||||
instance = cls(
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
"""Process_group."""
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
from typing import Dict, Union, Optional
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
import marshmallow
|
||||
from marshmallow import post_load
|
||||
|
@ -51,6 +53,8 @@ class ProcessGroupSchema(Schema):
|
|||
)
|
||||
|
||||
@post_load
|
||||
def make_process_group(self, data: Dict[str, Union[str, bool, int]], **kwargs) -> ProcessGroup:
|
||||
def make_process_group(
|
||||
self, data: Dict[str, Union[str, bool, int]], **kwargs
|
||||
) -> ProcessGroup:
|
||||
"""Make_process_group."""
|
||||
return ProcessGroup(**data)
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
"""Process_instance."""
|
||||
import enum
|
||||
from typing import Dict
|
||||
from typing import Union
|
||||
|
||||
import marshmallow
|
||||
from flask_bpmn.models.db import db
|
||||
|
@ -15,7 +17,6 @@ from sqlalchemy.orm import relationship
|
|||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.task import TaskSchema
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from typing import Dict, Union
|
||||
|
||||
|
||||
class NavigationItemSchema(Schema):
|
||||
|
|
|
@ -1,7 +1,9 @@
|
|||
"""Process_model."""
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import field
|
||||
from typing import Dict, Union, Optional
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
import marshmallow
|
||||
from marshmallow import post_load
|
||||
|
@ -64,6 +66,8 @@ class ProcessModelInfoSchema(Schema):
|
|||
files = marshmallow.fields.List(marshmallow.fields.Nested("FileSchema"))
|
||||
|
||||
@post_load
|
||||
def make_spec(self, data: Dict[str, Union[str, bool, int]], **kwargs) -> ProcessModelInfo:
|
||||
def make_spec(
|
||||
self, data: Dict[str, Union[str, bool, int]], **kwargs
|
||||
) -> ProcessModelInfo:
|
||||
"""Make_spec."""
|
||||
return ProcessModelInfo(**data)
|
||||
|
|
|
@ -1,12 +1,18 @@
|
|||
"""APIs for dealing with process groups, process models, and process instances."""
|
||||
import json
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
import connexion # type: ignore
|
||||
import flask.wrappers
|
||||
from flask import Blueprint
|
||||
from flask import g
|
||||
from flask import Response
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from flask_bpmn.models.db import db
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
from spiffworkflow_backend.models.file import FileSchema
|
||||
from spiffworkflow_backend.models.file import FileType
|
||||
|
@ -23,9 +29,6 @@ from spiffworkflow_backend.services.process_instance_service import (
|
|||
)
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
import flask.wrappers
|
||||
from typing import Dict, List, Optional, Union
|
||||
from werkzeug.datastructures import FileStorage
|
||||
|
||||
# from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore
|
||||
# from SpiffWorkflow.camunda.serializer.task_spec_converters import UserTaskConverter # type: ignore
|
||||
|
@ -34,7 +37,9 @@ from werkzeug.datastructures import FileStorage
|
|||
process_api_blueprint = Blueprint("process_api", __name__)
|
||||
|
||||
|
||||
def process_group_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.Response:
|
||||
def process_group_add(
|
||||
body: Dict[str, Union[str, bool, int]]
|
||||
) -> flask.wrappers.Response:
|
||||
"""Add_process_group."""
|
||||
# just so the import is used. oh, and it's imported because spiffworkflow_backend/unit/test_permissions.py
|
||||
# depends on it, and otherwise flask migrations won't include it in the list of database tables.
|
||||
|
@ -56,14 +61,16 @@ def process_group_delete(process_group_id: str) -> flask.wrappers.Response:
|
|||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def process_group_update(process_group_id: str, body: Dict[str, Union[str, bool, int]]) -> Dict[str, Union[str, bool, int]]:
|
||||
def process_group_update(
|
||||
process_group_id: str, body: Dict[str, Union[str, bool, int]]
|
||||
) -> Dict[str, Union[str, bool, int]]:
|
||||
"""Process Group Update."""
|
||||
process_group = ProcessGroupSchema().load(body)
|
||||
ProcessModelService().update_process_group(process_group)
|
||||
return ProcessGroupSchema().dump(process_group)
|
||||
|
||||
|
||||
def process_groups_list(page: int=1, per_page: int=100) -> flask.wrappers.Response:
|
||||
def process_groups_list(page: int = 1, per_page: int = 100) -> flask.wrappers.Response:
|
||||
"""Process_groups_list."""
|
||||
process_groups = sorted(ProcessModelService().get_process_groups())
|
||||
batch = ProcessModelService().get_batch(process_groups, page, per_page)
|
||||
|
@ -82,13 +89,17 @@ def process_groups_list(page: int=1, per_page: int=100) -> flask.wrappers.Respon
|
|||
return Response(json.dumps(response_json), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def process_group_show(process_group_id: str) -> Dict[str, Union[List[Dict[str, Union[str, bool, int]]], str, bool, int]]:
|
||||
def process_group_show(
|
||||
process_group_id: str,
|
||||
) -> Dict[str, Union[List[Dict[str, Union[str, bool, int]]], str, bool, int]]:
|
||||
"""Process_group_show."""
|
||||
process_group = ProcessModelService().get_process_group(process_group_id)
|
||||
return ProcessGroupSchema().dump(process_group)
|
||||
|
||||
|
||||
def process_model_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.Response:
|
||||
def process_model_add(
|
||||
body: Dict[str, Union[str, bool, int]]
|
||||
) -> flask.wrappers.Response:
|
||||
"""Add_process_model."""
|
||||
process_model_info = ProcessModelInfoSchema().load(body)
|
||||
process_model_service = ProcessModelService()
|
||||
|
@ -107,20 +118,26 @@ def process_model_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.
|
|||
)
|
||||
|
||||
|
||||
def process_model_delete(process_group_id: str, process_model_id: str) -> flask.wrappers.Response:
|
||||
def process_model_delete(
|
||||
process_group_id: str, process_model_id: str
|
||||
) -> flask.wrappers.Response:
|
||||
"""Process_model_delete."""
|
||||
ProcessModelService().process_model_delete(process_model_id)
|
||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def process_model_update(process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]) -> Dict[str, Union[str, bool, int]]:
|
||||
def process_model_update(
|
||||
process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]
|
||||
) -> Dict[str, Union[str, bool, int]]:
|
||||
"""Process_model_update."""
|
||||
process_model = ProcessModelInfoSchema().load(body)
|
||||
ProcessModelService().update_spec(process_model)
|
||||
return ProcessModelInfoSchema().dump(process_model)
|
||||
|
||||
|
||||
def process_model_show(process_group_id: str, process_model_id: str) -> Dict[str, Union[str, List[Dict[str, Optional[Union[str, int, bool]]]], bool, int]]:
|
||||
def process_model_show(
|
||||
process_group_id: str, process_model_id: str
|
||||
) -> Dict[str, Union[str, List[Dict[str, Optional[Union[str, int, bool]]]], bool, int]]:
|
||||
"""Process_model_show."""
|
||||
process_model = ProcessModelService().get_process_model(
|
||||
process_model_id, group_id=process_group_id
|
||||
|
@ -140,7 +157,9 @@ def process_model_show(process_group_id: str, process_model_id: str) -> Dict[str
|
|||
return process_model_json
|
||||
|
||||
|
||||
def process_model_list(process_group_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response:
|
||||
def process_model_list(
|
||||
process_group_id: str, page: int = 1, per_page: int = 100
|
||||
) -> flask.wrappers.Response:
|
||||
"""Process model list!"""
|
||||
process_models = sorted(ProcessModelService().get_process_models(process_group_id))
|
||||
batch = ProcessModelService().get_batch(
|
||||
|
@ -162,7 +181,9 @@ def process_model_list(process_group_id: str, page: int=1, per_page: int=100) ->
|
|||
return Response(json.dumps(response_json), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def get_file(process_group_id: str, process_model_id: str, file_name: str) -> Dict[str, Optional[Union[str, int, bool]]]:
|
||||
def get_file(
|
||||
process_group_id: str, process_model_id: str, file_name: str
|
||||
) -> Dict[str, Optional[Union[str, int, bool]]]:
|
||||
"""Get_file."""
|
||||
process_model = ProcessModelService().get_process_model(
|
||||
process_model_id, group_id=process_group_id
|
||||
|
@ -184,7 +205,9 @@ def get_file(process_group_id: str, process_model_id: str, file_name: str) -> Di
|
|||
return FileSchema().dump(file)
|
||||
|
||||
|
||||
def process_model_file_update(process_group_id: str, process_model_id: str, file_name: str) -> flask.wrappers.Response:
|
||||
def process_model_file_update(
|
||||
process_group_id: str, process_model_id: str, file_name: str
|
||||
) -> flask.wrappers.Response:
|
||||
"""Process_model_file_save."""
|
||||
process_model = ProcessModelService().get_process_model(
|
||||
process_model_id, group_id=process_group_id
|
||||
|
@ -225,7 +248,9 @@ def add_file(process_group_id: str, process_model_id: str) -> flask.wrappers.Res
|
|||
)
|
||||
|
||||
|
||||
def process_instance_create(process_group_id: str, process_model_id: str) -> flask.wrappers.Response:
|
||||
def process_instance_create(
|
||||
process_group_id: str, process_model_id: str
|
||||
) -> flask.wrappers.Response:
|
||||
"""Create_process_instance."""
|
||||
process_instance = ProcessInstanceService.create_process_instance(
|
||||
process_model_id, g.user, process_group_identifier=process_group_id
|
||||
|
@ -247,7 +272,9 @@ def process_instance_create(process_group_id: str, process_model_id: str) -> fla
|
|||
)
|
||||
|
||||
|
||||
def process_instance_list(process_group_id: str, process_model_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response:
|
||||
def process_instance_list(
|
||||
process_group_id: str, process_model_id: str, page: int = 1, per_page: int = 100
|
||||
) -> flask.wrappers.Response:
|
||||
"""Process_instance_list."""
|
||||
process_model = ProcessModelService().get_process_model(
|
||||
process_model_id, group_id=process_group_id
|
||||
|
@ -305,7 +332,9 @@ def process_instance_delete(process_group_id, process_model_id, process_instance
|
|||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def process_instance_report(process_group_id: str, process_model_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response:
|
||||
def process_instance_report(
|
||||
process_group_id: str, process_model_id: str, page: int = 1, per_page: int = 100
|
||||
) -> flask.wrappers.Response:
|
||||
"""Process_instance_list."""
|
||||
process_model = ProcessModelService().get_process_model(
|
||||
process_model_id, group_id=process_group_id
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
"""User."""
|
||||
from typing import Dict, Optional
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
|
@ -13,7 +14,7 @@ from spiffworkflow_backend.models.user import UserModel
|
|||
"""
|
||||
|
||||
|
||||
def verify_token(token: Optional[str]=None) -> Dict[str, None]:
|
||||
def verify_token(token: Optional[str] = None) -> Dict[str, None]:
|
||||
"""Verify the token for the user (if provided).
|
||||
|
||||
If in production environment and token is not provided, gets user from the SSO headers and returns their token.
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
"""File_system_service."""
|
||||
import os
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
import pytz
|
||||
from flask import current_app
|
||||
|
@ -11,7 +12,6 @@ from spiffworkflow_backend.models.file import CONTENT_TYPES
|
|||
from spiffworkflow_backend.models.file import File
|
||||
from spiffworkflow_backend.models.file import FileType
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from posix import DirEntry
|
||||
|
||||
|
||||
class FileSystemService:
|
||||
|
@ -132,7 +132,7 @@ class FileSystemService:
|
|||
return FileType[extension]
|
||||
|
||||
@staticmethod
|
||||
def _get_files(file_path: str, file_name: Optional[str]=None) -> List[File]:
|
||||
def _get_files(file_path: str, file_name: Optional[str] = None) -> List[File]:
|
||||
"""Returns an array of File objects at the given path, can be restricted to just one file."""
|
||||
files = []
|
||||
items = os.scandir(file_path)
|
||||
|
|
|
@ -1,7 +1,11 @@
|
|||
"""Process_instance_processor."""
|
||||
import json
|
||||
import time
|
||||
from typing import Any, Dict, Optional, Union, List
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from flask import current_app
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
|
@ -11,9 +15,11 @@ from SpiffWorkflow import Task as SpiffTask # type: ignore
|
|||
from SpiffWorkflow import TaskState
|
||||
from SpiffWorkflow import WorkflowException
|
||||
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
|
||||
from SpiffWorkflow.bpmn.PythonScriptEngine import Box, PythonScriptEngine # type: ignore
|
||||
from SpiffWorkflow.bpmn.PythonScriptEngine import Box
|
||||
from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine
|
||||
from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer # type: ignore
|
||||
from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.BpmnProcessSpec import BpmnProcessSpec
|
||||
from SpiffWorkflow.bpmn.specs.events import CancelEventDefinition # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.events import EndEvent
|
||||
from SpiffWorkflow.bpmn.workflow import BpmnWorkflow # type: ignore
|
||||
|
@ -24,6 +30,7 @@ from SpiffWorkflow.dmn.serializer import BusinessRuleTaskConverter # type: igno
|
|||
from SpiffWorkflow.exceptions import WorkflowTaskExecException # type: ignore
|
||||
from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
|
||||
from SpiffWorkflow.specs import WorkflowSpec # type: ignore
|
||||
from SpiffWorkflow.task import Task
|
||||
|
||||
from spiffworkflow_backend.models.file import File
|
||||
from spiffworkflow_backend.models.file import FileType
|
||||
|
@ -36,8 +43,6 @@ from spiffworkflow_backend.models.user import UserModelSchema
|
|||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.spec_file_service import SpecFileService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
from SpiffWorkflow.bpmn.specs.BpmnProcessSpec import BpmnProcessSpec
|
||||
from SpiffWorkflow.task import Task
|
||||
|
||||
# from crc.services.user_file_service import UserFileService
|
||||
|
||||
|
@ -53,7 +58,13 @@ class CustomBpmnScriptEngine(PythonScriptEngine):
|
|||
"""Evaluate."""
|
||||
return self._evaluate(expression, task.data, task)
|
||||
|
||||
def _evaluate(self, expression: str, context: Dict[str, Union[Box, str]], task: Optional[Task]=None, external_methods: None=None) -> str:
|
||||
def _evaluate(
|
||||
self,
|
||||
expression: str,
|
||||
context: Dict[str, Union[Box, str]],
|
||||
task: Optional[Task] = None,
|
||||
external_methods: None = None,
|
||||
) -> str:
|
||||
"""Evaluate the given expression, within the context of the given task and return the result."""
|
||||
try:
|
||||
return super()._evaluate(expression, context, task, {})
|
||||
|
@ -64,7 +75,9 @@ class CustomBpmnScriptEngine(PythonScriptEngine):
|
|||
"'%s', %s" % (expression, str(exception)),
|
||||
) from exception
|
||||
|
||||
def execute(self, task: SpiffTask, script: str, data: Dict[str, Dict[str, str]]) -> None:
|
||||
def execute(
|
||||
self, task: SpiffTask, script: str, data: Dict[str, Dict[str, str]]
|
||||
) -> None:
|
||||
"""Execute."""
|
||||
try:
|
||||
super().execute(task, script, data)
|
||||
|
@ -95,7 +108,7 @@ class ProcessInstanceProcessor:
|
|||
VALIDATION_PROCESS_KEY = "validate_only"
|
||||
|
||||
def __init__(
|
||||
self, process_instance_model: ProcessInstanceModel, validate_only: bool=False
|
||||
self, process_instance_model: ProcessInstanceModel, validate_only: bool = False
|
||||
) -> None:
|
||||
"""Create a Workflow Processor based on the serialized information available in the process_instance model."""
|
||||
self.process_instance_model = process_instance_model
|
||||
|
@ -389,7 +402,9 @@ class ProcessInstanceProcessor:
|
|||
return parser
|
||||
|
||||
@staticmethod
|
||||
def get_spec(files: List[File], process_model_info: ProcessModelInfo) -> BpmnProcessSpec:
|
||||
def get_spec(
|
||||
files: List[File], process_model_info: ProcessModelInfo
|
||||
) -> BpmnProcessSpec:
|
||||
"""Returns a SpiffWorkflow specification for the given process_instance spec, using the files provided."""
|
||||
parser = ProcessInstanceProcessor.get_parser()
|
||||
|
||||
|
@ -443,7 +458,7 @@ class ProcessInstanceProcessor:
|
|||
"""Get_status."""
|
||||
return self.status_of(self.bpmn_process_instance)
|
||||
|
||||
def do_engine_steps(self, exit_at: None=None) -> None:
|
||||
def do_engine_steps(self, exit_at: None = None) -> None:
|
||||
"""Do_engine_steps."""
|
||||
try:
|
||||
self.bpmn_process_instance.refresh_waiting_tasks()
|
||||
|
|
|
@ -1,12 +1,16 @@
|
|||
"""Process_instance_service."""
|
||||
import time
|
||||
from typing import Any, Dict, Optional, List
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
from flask import current_app
|
||||
from flask_bpmn.models.db import db
|
||||
from SpiffWorkflow import NavItem # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.ManualTask import ManualTask # type: ignore
|
||||
from SpiffWorkflow.bpmn.specs.UserTask import UserTask # type: ignore
|
||||
from SpiffWorkflow.task import Task
|
||||
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
|
||||
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceApi
|
||||
|
@ -14,13 +18,12 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|||
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
|
||||
from spiffworkflow_backend.models.task_event import TaskAction
|
||||
from spiffworkflow_backend.models.task_event import TaskEventModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
)
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
from SpiffWorkflow.task import Task
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
|
||||
|
||||
class ProcessInstanceService:
|
||||
|
@ -30,7 +33,9 @@ class ProcessInstanceService:
|
|||
|
||||
@staticmethod
|
||||
def create_process_instance(
|
||||
process_model_identifier: str, user: UserModel, process_group_identifier: Optional[str]=None
|
||||
process_model_identifier: str,
|
||||
user: UserModel,
|
||||
process_group_identifier: Optional[str] = None,
|
||||
) -> ProcessInstanceModel:
|
||||
"""Get_process_instance_from_spec."""
|
||||
process_instance_model = ProcessInstanceModel(
|
||||
|
@ -46,7 +51,7 @@ class ProcessInstanceService:
|
|||
|
||||
@staticmethod
|
||||
def processor_to_process_instance_api(
|
||||
processor: ProcessInstanceProcessor, next_task: None=None
|
||||
processor: ProcessInstanceProcessor, next_task: None = None
|
||||
) -> ProcessInstanceApi:
|
||||
"""Returns an API model representing the state of the current process_instance.
|
||||
|
||||
|
@ -121,7 +126,9 @@ class ProcessInstanceService:
|
|||
ProcessInstanceService.update_navigation(nav_item.children, processor)
|
||||
|
||||
@staticmethod
|
||||
def get_previously_submitted_data(process_instance_id: int, spiff_task: Task) -> Dict[Any, Any]:
|
||||
def get_previously_submitted_data(
|
||||
process_instance_id: int, spiff_task: Task
|
||||
) -> Dict[Any, Any]:
|
||||
"""If the user has completed this task previously, find the form data for the last submission."""
|
||||
query = (
|
||||
db.session.query(TaskEventModel)
|
||||
|
|
|
@ -2,7 +2,10 @@
|
|||
import json
|
||||
import os
|
||||
import shutil
|
||||
from typing import Any, Optional, Union, List
|
||||
from typing import Any
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
|
||||
|
@ -27,7 +30,11 @@ class ProcessModelService(FileSystemService):
|
|||
WF_SCHEMA = ProcessModelInfoSchema()
|
||||
|
||||
@staticmethod
|
||||
def get_batch(items: List[Union[Any, ProcessGroup, ProcessModelInfo]], page: int=1, per_page: int=10) -> List[Union[Any, ProcessGroup, ProcessModelInfo]]:
|
||||
def get_batch(
|
||||
items: List[Union[Any, ProcessGroup, ProcessModelInfo]],
|
||||
page: int = 1,
|
||||
per_page: int = 10,
|
||||
) -> List[Union[Any, ProcessGroup, ProcessModelInfo]]:
|
||||
"""Get_batch."""
|
||||
start = (page - 1) * per_page
|
||||
end = start + per_page
|
||||
|
@ -87,7 +94,9 @@ class ProcessModelService(FileSystemService):
|
|||
if os.path.exists(path):
|
||||
return self.__scan_spec(path, FileSystemService.MASTER_SPECIFICATION)
|
||||
|
||||
def get_process_model(self, process_model_id: str, group_id: Optional[str]=None) -> Optional[ProcessModelInfo]:
|
||||
def get_process_model(
|
||||
self, process_model_id: str, group_id: Optional[str] = None
|
||||
) -> Optional[ProcessModelInfo]:
|
||||
"""Get a process model from a model and group id."""
|
||||
if not os.path.exists(FileSystemService.root_path()):
|
||||
return # Nothing to scan yet. There are no files.
|
||||
|
@ -112,7 +121,9 @@ class ProcessModelService(FileSystemService):
|
|||
)
|
||||
return self.__scan_spec(sd.path, sd.name, process_group)
|
||||
|
||||
def get_process_models(self, process_group_id: Optional[str]=None) -> List[ProcessModelInfo]:
|
||||
def get_process_models(
|
||||
self, process_group_id: Optional[str] = None
|
||||
) -> List[ProcessModelInfo]:
|
||||
"""Get process models."""
|
||||
if process_group_id is None:
|
||||
process_groups = self.get_process_groups()
|
||||
|
@ -143,7 +154,9 @@ class ProcessModelService(FileSystemService):
|
|||
)
|
||||
return self.cleanup_workflow_spec_display_order(spec.process_group)
|
||||
|
||||
def cleanup_workflow_spec_display_order(self, process_group: ProcessGroup) -> List[Union[Any, ProcessModelInfo]]:
|
||||
def cleanup_workflow_spec_display_order(
|
||||
self, process_group: ProcessGroup
|
||||
) -> List[Union[Any, ProcessModelInfo]]:
|
||||
"""Cleanup_workflow_spec_display_order."""
|
||||
index = 0
|
||||
if not process_group:
|
||||
|
|
|
@ -2,12 +2,14 @@
|
|||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
from typing import Optional, List
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from lxml import etree # type: ignore
|
||||
from lxml.etree import _Element, Element as EtreeElement # type: ignore
|
||||
from lxml.etree import _Element
|
||||
from lxml.etree import Element as EtreeElement
|
||||
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
|
||||
|
||||
from spiffworkflow_backend.models.file import File
|
||||
|
@ -27,9 +29,9 @@ class SpecFileService(FileSystemService):
|
|||
@staticmethod
|
||||
def get_files(
|
||||
workflow_spec: ProcessModelInfo,
|
||||
file_name: Optional[str]=None,
|
||||
include_libraries: bool=False,
|
||||
extension_filter: str="",
|
||||
file_name: Optional[str] = None,
|
||||
include_libraries: bool = False,
|
||||
extension_filter: str = "",
|
||||
) -> List[File]:
|
||||
"""Return all files associated with a workflow specification."""
|
||||
path = SpecFileService.workflow_path(workflow_spec)
|
||||
|
|
|
@ -1,10 +1,7 @@
|
|||
"""Test Process Api Blueprint."""
|
||||
import io
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
from typing import Dict
|
||||
from typing import Iterator
|
||||
from typing import Optional
|
||||
from typing import Union
|
||||
|
||||
|
|
|
@ -5,7 +5,9 @@ from spiffworkflow_backend.models.process_group import ProcessGroup
|
|||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
|
||||
def test_there_is_at_least_one_group_after_we_create_one(app: Flask, with_bpmn_file_cleanup: None) -> None:
|
||||
def test_there_is_at_least_one_group_after_we_create_one(
|
||||
app: Flask, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_there_is_at_least_one_group_after_we_create_one."""
|
||||
process_model_service = ProcessModelService()
|
||||
process_group = ProcessGroup(id="hey", display_name="sure")
|
||||
|
|
Loading…
Reference in New Issue