diff --git a/conftest.py b/conftest.py index 4e073ddf..96b06c8f 100644 --- a/conftest.py +++ b/conftest.py @@ -1,8 +1,11 @@ """Conftest.""" import os +import shutil +from typing import Iterator import pytest from flask.app import Flask +from spiffworkflow_backend.services.process_model_service import ProcessModelService # We need to call this before importing spiffworkflow_backend @@ -32,3 +35,14 @@ def app() -> Flask: ) return app + + +@pytest.fixture() +def with_bpmn_file_cleanup() -> Iterator[None]: + """Process_group_resource.""" + try: + yield + finally: + process_model_service = ProcessModelService() + if os.path.exists(process_model_service.root_path()): + shutil.rmtree(process_model_service.root_path()) diff --git a/src/spiffworkflow_backend/config/__init__.py b/src/spiffworkflow_backend/config/__init__.py index d3f20a88..05b76213 100644 --- a/src/spiffworkflow_backend/config/__init__.py +++ b/src/spiffworkflow_backend/config/__init__.py @@ -6,7 +6,7 @@ from flask.app import Flask from werkzeug.utils import ImportStringError -def setup_logger_for_sql_statements(app: Flask): +def setup_logger_for_sql_statements(app: Flask) -> None: """Setup_logger_for_sql_statements.""" db_log_file_name = f"log/db_{app.env}.log" db_handler_log_level = logging.INFO @@ -19,7 +19,7 @@ def setup_logger_for_sql_statements(app: Flask): db_logger.setLevel(db_logger_log_level) -def setup_database_uri(app: Flask): +def setup_database_uri(app: Flask) -> None: """Setup_database_uri.""" if os.environ.get("DATABASE_URI") is None: if os.environ.get("SPIFF_DATABASE_TYPE") == "sqlite": diff --git a/src/spiffworkflow_backend/models/file.py b/src/spiffworkflow_backend/models/file.py index ab8087e4..ebd02355 100644 --- a/src/spiffworkflow_backend/models/file.py +++ b/src/spiffworkflow_backend/models/file.py @@ -12,6 +12,7 @@ from sqlalchemy.orm import deferred from sqlalchemy.orm import relationship from spiffworkflow_backend.models.data_store import DataStoreModel +from datetime import datetime class FileModel(SpiffworkflowBaseDBModel): @@ -109,14 +110,14 @@ class File: process_group_id: Optional[str] = None archived: bool = False - def __post_init__(self): + def __post_init__(self) -> None: """__post_init__.""" self.sort_index = f"{self.type}:{self.name}" @classmethod def from_file_system( - cls, file_name, file_type, content_type, last_modified, file_size - ): + cls, file_name: str, file_type: FileType, content_type: str, last_modified: datetime, file_size: int + ) -> "File": """From_file_system.""" instance = cls( name=file_name, diff --git a/src/spiffworkflow_backend/models/process_group.py b/src/spiffworkflow_backend/models/process_group.py index 1772a00c..08b0d2ab 100644 --- a/src/spiffworkflow_backend/models/process_group.py +++ b/src/spiffworkflow_backend/models/process_group.py @@ -1,7 +1,7 @@ """Process_group.""" from dataclasses import dataclass from dataclasses import field -from typing import Optional +from typing import Dict, Union, Optional import marshmallow from marshmallow import post_load @@ -22,7 +22,7 @@ class ProcessGroup: admin: Optional[bool] = False process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list) - def __post_init__(self): + def __post_init__(self) -> None: """__post_init__.""" self.sort_index = f"{self.display_order}:{self.id}" @@ -51,6 +51,6 @@ class ProcessGroupSchema(Schema): ) @post_load - def make_process_group(self, data, **kwargs): + def make_process_group(self, data: Dict[str, Union[str, bool, int]], **kwargs) -> ProcessGroup: """Make_process_group.""" return ProcessGroup(**data) diff --git a/src/spiffworkflow_backend/models/process_instance.py b/src/spiffworkflow_backend/models/process_instance.py index ef766607..5accfbae 100644 --- a/src/spiffworkflow_backend/models/process_instance.py +++ b/src/spiffworkflow_backend/models/process_instance.py @@ -15,6 +15,7 @@ from sqlalchemy.orm import relationship from spiffworkflow_backend.models.process_model import ProcessModelInfo from spiffworkflow_backend.models.task import TaskSchema from spiffworkflow_backend.models.user import UserModel +from typing import Dict, Union class NavigationItemSchema(Schema): @@ -87,7 +88,7 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): # type: ignore status = db.Column(db.Enum(ProcessInstanceStatus)) @property - def serialized(self): + def serialized(self) -> Dict[str, Union[int, str]]: """Return object data in serializeable format.""" return { "id": self.id, @@ -106,17 +107,17 @@ class ProcessInstanceApi: def __init__( self, - id, - status, - next_task, - process_model_identifier, - process_group_identifier, - total_tasks, - completed_tasks, - updated_at_in_seconds, - is_review, - title, - ): + id: int, + status: ProcessInstanceStatus, + next_task: None, + process_model_identifier: str, + process_group_identifier: str, + total_tasks: int, + completed_tasks: int, + updated_at_in_seconds: int, + is_review: bool, + title: str, + ) -> None: """__init__.""" self.id = id self.status = status diff --git a/src/spiffworkflow_backend/models/process_model.py b/src/spiffworkflow_backend/models/process_model.py index c11f68e1..ad6b3dcb 100644 --- a/src/spiffworkflow_backend/models/process_model.py +++ b/src/spiffworkflow_backend/models/process_model.py @@ -1,7 +1,7 @@ """Process_model.""" from dataclasses import dataclass from dataclasses import field -from typing import Optional +from typing import Dict, Union, Optional import marshmallow from marshmallow import post_load @@ -28,7 +28,7 @@ class ProcessModelInfo: is_review: Optional[bool] = False files: Optional[list[str]] = field(default_factory=list) - def __post_init__(self): + def __post_init__(self) -> None: """__post_init__.""" self.sort_index = f"{self.process_group_id}:{self.id}" @@ -64,6 +64,6 @@ class ProcessModelInfoSchema(Schema): files = marshmallow.fields.List(marshmallow.fields.Nested("FileSchema")) @post_load - def make_spec(self, data, **kwargs): + def make_spec(self, data: Dict[str, Union[str, bool, int]], **kwargs) -> ProcessModelInfo: """Make_spec.""" return ProcessModelInfo(**data) diff --git a/src/spiffworkflow_backend/routes/process_api_blueprint.py b/src/spiffworkflow_backend/routes/process_api_blueprint.py index 3035d657..1c17ad9a 100644 --- a/src/spiffworkflow_backend/routes/process_api_blueprint.py +++ b/src/spiffworkflow_backend/routes/process_api_blueprint.py @@ -23,6 +23,9 @@ from spiffworkflow_backend.services.process_instance_service import ( ) from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.spec_file_service import SpecFileService +import flask.wrappers +from typing import Dict, List, Optional, Union +from werkzeug.datastructures import FileStorage # from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflowSerializer # type: ignore # from SpiffWorkflow.camunda.serializer.task_spec_converters import UserTaskConverter # type: ignore @@ -31,7 +34,7 @@ from spiffworkflow_backend.services.spec_file_service import SpecFileService process_api_blueprint = Blueprint("process_api", __name__) -def process_group_add(body): +def process_group_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.Response: """Add_process_group.""" # just so the import is used. oh, and it's imported because spiffworkflow_backend/unit/test_permissions.py # depends on it, and otherwise flask migrations won't include it in the list of database tables. @@ -47,20 +50,20 @@ def process_group_add(body): ) -def process_group_delete(process_group_id): +def process_group_delete(process_group_id: str) -> flask.wrappers.Response: """Process_group_delete.""" ProcessModelService().process_group_delete(process_group_id) return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") -def process_group_update(process_group_id, body): +def process_group_update(process_group_id: str, body: Dict[str, Union[str, bool, int]]) -> Dict[str, Union[str, bool, int]]: """Process Group Update.""" process_group = ProcessGroupSchema().load(body) ProcessModelService().update_process_group(process_group) return ProcessGroupSchema().dump(process_group) -def process_groups_list(page=1, per_page=100): +def process_groups_list(page: int=1, per_page: int=100) -> flask.wrappers.Response: """Process_groups_list.""" process_groups = sorted(ProcessModelService().get_process_groups()) batch = ProcessModelService().get_batch(process_groups, page, per_page) @@ -79,13 +82,13 @@ def process_groups_list(page=1, per_page=100): return Response(json.dumps(response_json), status=200, mimetype="application/json") -def process_group_show(process_group_id): +def process_group_show(process_group_id: str) -> Dict[str, Union[List[Dict[str, Union[str, bool, int]]], str, bool, int]]: """Process_group_show.""" process_group = ProcessModelService().get_process_group(process_group_id) return ProcessGroupSchema().dump(process_group) -def process_model_add(body): +def process_model_add(body: Dict[str, Union[str, bool, int]]) -> flask.wrappers.Response: """Add_process_model.""" process_model_info = ProcessModelInfoSchema().load(body) process_model_service = ProcessModelService() @@ -104,20 +107,20 @@ def process_model_add(body): ) -def process_model_delete(process_group_id, process_model_id): +def process_model_delete(process_group_id: str, process_model_id: str) -> flask.wrappers.Response: """Process_model_delete.""" ProcessModelService().process_model_delete(process_model_id) return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") -def process_model_update(process_group_id, process_model_id, body): +def process_model_update(process_group_id: str, process_model_id: str, body: Dict[str, Union[str, bool, int]]) -> Dict[str, Union[str, bool, int]]: """Process_model_update.""" process_model = ProcessModelInfoSchema().load(body) ProcessModelService().update_spec(process_model) return ProcessModelInfoSchema().dump(process_model) -def process_model_show(process_group_id, process_model_id): +def process_model_show(process_group_id: str, process_model_id: str) -> Dict[str, Union[str, List[Dict[str, Optional[Union[str, int, bool]]]], bool, int]]: """Process_model_show.""" process_model = ProcessModelService().get_process_model( process_model_id, group_id=process_group_id @@ -137,7 +140,7 @@ def process_model_show(process_group_id, process_model_id): return process_model_json -def process_model_list(process_group_id, page=1, per_page=100): +def process_model_list(process_group_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response: """Process model list!""" process_models = sorted(ProcessModelService().get_process_models(process_group_id)) batch = ProcessModelService().get_batch( @@ -159,7 +162,7 @@ def process_model_list(process_group_id, page=1, per_page=100): return Response(json.dumps(response_json), status=200, mimetype="application/json") -def get_file(process_group_id, process_model_id, file_name): +def get_file(process_group_id: str, process_model_id: str, file_name: str) -> Dict[str, Optional[Union[str, int, bool]]]: """Get_file.""" process_model = ProcessModelService().get_process_model( process_model_id, group_id=process_group_id @@ -181,7 +184,7 @@ def get_file(process_group_id, process_model_id, file_name): return FileSchema().dump(file) -def process_model_file_update(process_group_id, process_model_id, file_name): +def process_model_file_update(process_group_id: str, process_model_id: str, file_name: str) -> flask.wrappers.Response: """Process_model_file_save.""" process_model = ProcessModelService().get_process_model( process_model_id, group_id=process_group_id @@ -200,7 +203,7 @@ def process_model_file_update(process_group_id, process_model_id, file_name): return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") -def add_file(process_group_id, process_model_id): +def add_file(process_group_id: str, process_model_id: str) -> flask.wrappers.Response: """Add_file.""" process_model_service = ProcessModelService() process_model = process_model_service.get_process_model( @@ -222,7 +225,7 @@ def add_file(process_group_id, process_model_id): ) -def process_instance_create(process_group_id, process_model_id): +def process_instance_create(process_group_id: str, process_model_id: str) -> flask.wrappers.Response: """Create_process_instance.""" process_instance = ProcessInstanceService.create_process_instance( process_model_id, g.user, process_group_identifier=process_group_id @@ -244,7 +247,7 @@ def process_instance_create(process_group_id, process_model_id): ) -def process_instance_list(process_group_id, process_model_id, page=1, per_page=100): +def process_instance_list(process_group_id: str, process_model_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response: """Process_instance_list.""" process_model = ProcessModelService().get_process_model( process_model_id, group_id=process_group_id @@ -302,7 +305,7 @@ def process_instance_delete(process_group_id, process_model_id, process_instance return Response(json.dumps({"ok": True}), status=200, mimetype="application/json") -def process_instance_report(process_group_id, process_model_id, page=1, per_page=100): +def process_instance_report(process_group_id: str, process_model_id: str, page: int=1, per_page: int=100) -> flask.wrappers.Response: """Process_instance_list.""" process_model = ProcessModelService().get_process_model( process_model_id, group_id=process_group_id @@ -344,7 +347,7 @@ def process_instance_report(process_group_id, process_model_id, page=1, per_page return Response(json.dumps(response_json), status=200, mimetype="application/json") -def get_file_from_request(): +def get_file_from_request() -> FileStorage: """Get_file_from_request.""" request_file = connexion.request.files.get("file") if not request_file: diff --git a/src/spiffworkflow_backend/routes/user.py b/src/spiffworkflow_backend/routes/user.py index c7eea67e..ce387fc5 100644 --- a/src/spiffworkflow_backend/routes/user.py +++ b/src/spiffworkflow_backend/routes/user.py @@ -1,4 +1,6 @@ """User.""" +from typing import Dict, Optional + from flask import current_app from flask import g from flask_bpmn.api.api_error import ApiError @@ -11,7 +13,7 @@ from spiffworkflow_backend.models.user import UserModel """ -def verify_token(token=None): +def verify_token(token: Optional[str]=None) -> Dict[str, None]: """Verify the token for the user (if provided). If in production environment and token is not provided, gets user from the SSO headers and returns their token. diff --git a/src/spiffworkflow_backend/services/file_system_service.py b/src/spiffworkflow_backend/services/file_system_service.py index 86ac3fb1..24f3eced 100644 --- a/src/spiffworkflow_backend/services/file_system_service.py +++ b/src/spiffworkflow_backend/services/file_system_service.py @@ -1,7 +1,7 @@ """File_system_service.""" import os from datetime import datetime -from typing import List +from typing import Optional, List import pytz from flask import current_app @@ -11,6 +11,7 @@ from spiffworkflow_backend.models.file import CONTENT_TYPES from spiffworkflow_backend.models.file import File from spiffworkflow_backend.models.file import FileType from spiffworkflow_backend.models.process_model import ProcessModelInfo +from posix import DirEntry class FileSystemService: @@ -28,7 +29,7 @@ class FileSystemService: WF_JSON_FILE = "workflow.json" @staticmethod - def root_path(): + def root_path() -> str: """Root_path.""" # fixme: allow absolute files dir_name = current_app.config["BPMN_SPEC_ABSOLUTE_DIR"] @@ -36,7 +37,7 @@ class FileSystemService: return os.path.join(app_root, "..", dir_name) @staticmethod - def process_group_path(name: str): + def process_group_path(name: str) -> str: """Category_path.""" return os.path.join(FileSystemService.root_path(), name) @@ -48,7 +49,7 @@ class FileSystemService: ) @staticmethod - def process_group_path_for_spec(spec): + def process_group_path_for_spec(spec: ProcessModelInfo) -> str: """Category_path_for_spec.""" if spec.is_master_spec: return os.path.join(FileSystemService.root_path()) @@ -67,7 +68,7 @@ class FileSystemService: return process_group_path @staticmethod - def workflow_path(spec: ProcessModelInfo): + def workflow_path(spec: ProcessModelInfo) -> str: """Workflow_path.""" if spec.is_master_spec: return os.path.join( @@ -77,7 +78,7 @@ class FileSystemService: process_group_path = FileSystemService.process_group_path_for_spec(spec) return os.path.join(process_group_path, spec.id) - def next_display_order(self, spec): + def next_display_order(self, spec: ProcessModelInfo) -> int: """Next_display_order.""" path = self.process_group_path_for_spec(spec) if os.path.exists(path): @@ -86,14 +87,14 @@ class FileSystemService: return 0 @staticmethod - def write_file_data_to_system(file_path: str, file_data): + def write_file_data_to_system(file_path: str, file_data: bytes) -> None: """Write_file_data_to_system.""" os.makedirs(os.path.dirname(file_path), exist_ok=True) with open(file_path, "wb") as f_handle: f_handle.write(file_data) @staticmethod - def get_extension(file_name) -> str: + def get_extension(file_name: str) -> str: """Get_extension.""" _, file_extension = os.path.splitext(file_name) return file_extension.lower().strip()[1:] @@ -125,13 +126,13 @@ class FileSystemService: return aware_utc_dt @staticmethod - def file_type(file_name): + def file_type(file_name: str) -> FileType: """File_type.""" extension = FileSystemService.get_extension(file_name) return FileType[extension] @staticmethod - def _get_files(file_path: str, file_name=None) -> List[File]: + def _get_files(file_path: str, file_name: Optional[str]=None) -> List[File]: """Returns an array of File objects at the given path, can be restricted to just one file.""" files = [] items = os.scandir(file_path) @@ -160,7 +161,7 @@ class FileSystemService: return file @staticmethod - def to_file_object_from_dir_entry(item: os.DirEntry): + def to_file_object_from_dir_entry(item: os.DirEntry) -> File: """To_file_object_from_dir_entry.""" extension = FileSystemService.get_extension(item.name) try: diff --git a/src/spiffworkflow_backend/services/process_instance_processor.py b/src/spiffworkflow_backend/services/process_instance_processor.py index 00fd3f7b..7a3cce9e 100644 --- a/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,7 +1,7 @@ """Process_instance_processor.""" import json import time -from typing import List +from typing import Any, Dict, Optional, Union, List from flask import current_app from flask_bpmn.api.api_error import ApiError @@ -11,7 +11,7 @@ from SpiffWorkflow import Task as SpiffTask # type: ignore from SpiffWorkflow import TaskState from SpiffWorkflow import WorkflowException from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore -from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore +from SpiffWorkflow.bpmn.PythonScriptEngine import Box, PythonScriptEngine # type: ignore from SpiffWorkflow.bpmn.serializer import BpmnWorkflowSerializer # type: ignore from SpiffWorkflow.bpmn.serializer.BpmnSerializer import BpmnSerializer # type: ignore from SpiffWorkflow.bpmn.specs.events import CancelEventDefinition # type: ignore @@ -36,6 +36,8 @@ from spiffworkflow_backend.models.user import UserModelSchema from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.spec_file_service import SpecFileService from spiffworkflow_backend.services.user_service import UserService +from SpiffWorkflow.bpmn.specs.BpmnProcessSpec import BpmnProcessSpec +from SpiffWorkflow.task import Task # from crc.services.user_file_service import UserFileService @@ -47,11 +49,11 @@ class CustomBpmnScriptEngine(PythonScriptEngine): scripts directory available for execution. """ - def evaluate(self, task, expression): + def evaluate(self, task: Task, expression: str) -> str: """Evaluate.""" return self._evaluate(expression, task.data, task) - def _evaluate(self, expression, context, task=None, external_methods=None): + def _evaluate(self, expression: str, context: Dict[str, Union[Box, str]], task: Optional[Task]=None, external_methods: None=None) -> str: """Evaluate the given expression, within the context of the given task and return the result.""" try: return super()._evaluate(expression, context, task, {}) @@ -62,7 +64,7 @@ class CustomBpmnScriptEngine(PythonScriptEngine): "'%s', %s" % (expression, str(exception)), ) from exception - def execute(self, task: SpiffTask, script, data): + def execute(self, task: SpiffTask, script: str, data: Dict[str, Dict[str, str]]) -> None: """Execute.""" try: super().execute(task, script, data) @@ -93,8 +95,8 @@ class ProcessInstanceProcessor: VALIDATION_PROCESS_KEY = "validate_only" def __init__( - self, process_instance_model: ProcessInstanceModel, validate_only=False - ): + self, process_instance_model: ProcessInstanceModel, validate_only: bool=False + ) -> None: """Create a Workflow Processor based on the serialized information available in the process_instance model.""" self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() @@ -200,7 +202,7 @@ class ProcessInstanceProcessor: ) from ke @staticmethod - def add_user_info_to_process_instance(bpmn_process_instance): + def add_user_info_to_process_instance(bpmn_process_instance: BpmnWorkflow) -> None: """Add_user_info_to_process_instance.""" if UserService.has_user(): current_user = UserService.current_user(allow_admin_impersonate=True) @@ -327,7 +329,7 @@ class ProcessInstanceProcessor: ] = validate_only return bpmn_process_instance - def save(self): + def save(self) -> None: """Saves the current state of this processor to the database.""" self.process_instance_model.bpmn_json = self.serialize() complete_states = [TaskState.CANCELLED, TaskState.COMPLETED] @@ -381,13 +383,13 @@ class ProcessInstanceProcessor: return bpmn_process_instance.last_task.data @staticmethod - def get_parser(): + def get_parser() -> MyCustomParser: """Get_parser.""" parser = MyCustomParser() return parser @staticmethod - def get_spec(files: List[File], process_model_info: ProcessModelInfo): + def get_spec(files: List[File], process_model_info: ProcessModelInfo) -> BpmnProcessSpec: """Returns a SpiffWorkflow specification for the given process_instance spec, using the files provided.""" parser = ProcessInstanceProcessor.get_parser() @@ -424,7 +426,7 @@ class ProcessInstanceProcessor: return spec @staticmethod - def status_of(bpmn_process_instance): + def status_of(bpmn_process_instance: BpmnWorkflow) -> ProcessInstanceStatus: """Status_of.""" if bpmn_process_instance.is_completed(): return ProcessInstanceStatus.complete @@ -437,11 +439,11 @@ class ProcessInstanceProcessor: else: return ProcessInstanceStatus.waiting - def get_status(self): + def get_status(self) -> ProcessInstanceStatus: """Get_status.""" return self.status_of(self.bpmn_process_instance) - def do_engine_steps(self, exit_at=None): + def do_engine_steps(self, exit_at: None=None) -> None: """Do_engine_steps.""" try: self.bpmn_process_instance.refresh_waiting_tasks() @@ -464,7 +466,7 @@ class ProcessInstanceProcessor: except WorkflowTaskExecException as we: raise ApiError.from_workflow_exception("task_error", str(we), we) from we - def serialize(self): + def serialize(self) -> str: """Serialize.""" return self._serializer.serialize_json(self.bpmn_process_instance) @@ -472,7 +474,7 @@ class ProcessInstanceProcessor: """Next_user_tasks.""" return self.bpmn_process_instance.get_ready_user_tasks() - def next_task(self): + def next_task(self) -> Task: """Returns the next task that should be completed even if there are parallel tasks and multiple options are available. If the process_instance is complete @@ -541,7 +543,7 @@ class ProcessInstanceProcessor: next_task = task return next_task - def completed_user_tasks(self): + def completed_user_tasks(self) -> List[Any]: """Completed_user_tasks.""" completed_user_tasks = self.bpmn_process_instance.get_tasks(TaskState.COMPLETED) completed_user_tasks.reverse() @@ -567,7 +569,7 @@ class ProcessInstanceProcessor: """Get_data.""" return self.bpmn_process_instance.data - def get_process_instance_id(self): + def get_process_instance_id(self) -> int: """Get_process_instance_id.""" return self.process_instance_model.id @@ -594,7 +596,7 @@ class ProcessInstanceProcessor: additional_tasks.append(child) return ready_tasks + additional_tasks - def get_all_user_tasks(self): + def get_all_user_tasks(self) -> List[Union[Task, Any]]: """Get_all_user_tasks.""" all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) return [ diff --git a/src/spiffworkflow_backend/services/process_instance_service.py b/src/spiffworkflow_backend/services/process_instance_service.py index 3a8136cc..f7e4a788 100644 --- a/src/spiffworkflow_backend/services/process_instance_service.py +++ b/src/spiffworkflow_backend/services/process_instance_service.py @@ -1,6 +1,6 @@ """Process_instance_service.""" import time -from typing import List +from typing import Any, Dict, Optional, List from flask import current_app from flask_bpmn.models.db import db @@ -19,6 +19,8 @@ from spiffworkflow_backend.services.process_instance_processor import ( ) from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.user_service import UserService +from SpiffWorkflow.task import Task +from spiffworkflow_backend.models.user import UserModel class ProcessInstanceService: @@ -28,8 +30,8 @@ class ProcessInstanceService: @staticmethod def create_process_instance( - process_model_identifier, user, process_group_identifier=None - ): + process_model_identifier: str, user: UserModel, process_group_identifier: Optional[str]=None + ) -> ProcessInstanceModel: """Get_process_instance_from_spec.""" process_instance_model = ProcessInstanceModel( status=ProcessInstanceStatus.not_started, @@ -44,8 +46,8 @@ class ProcessInstanceService: @staticmethod def processor_to_process_instance_api( - processor: ProcessInstanceProcessor, next_task=None - ): + processor: ProcessInstanceProcessor, next_task: None=None + ) -> ProcessInstanceApi: """Returns an API model representing the state of the current process_instance. If requested, and possible, next_task is set to the current_task. @@ -119,7 +121,7 @@ class ProcessInstanceService: ProcessInstanceService.update_navigation(nav_item.children, processor) @staticmethod - def get_previously_submitted_data(process_instance_id, spiff_task): + def get_previously_submitted_data(process_instance_id: int, spiff_task: Task) -> Dict[Any, Any]: """If the user has completed this task previously, find the form data for the last submission.""" query = ( db.session.query(TaskEventModel) diff --git a/src/spiffworkflow_backend/services/process_model_service.py b/src/spiffworkflow_backend/services/process_model_service.py index aa971e87..e35e7e15 100644 --- a/src/spiffworkflow_backend/services/process_model_service.py +++ b/src/spiffworkflow_backend/services/process_model_service.py @@ -2,7 +2,7 @@ import json import os import shutil -from typing import List +from typing import Any, Optional, Union, List from flask_bpmn.api.api_error import ApiError @@ -27,19 +27,19 @@ class ProcessModelService(FileSystemService): WF_SCHEMA = ProcessModelInfoSchema() @staticmethod - def get_batch(items, page=1, per_page=10): + def get_batch(items: List[Union[Any, ProcessGroup, ProcessModelInfo]], page: int=1, per_page: int=10) -> List[Union[Any, ProcessGroup, ProcessModelInfo]]: """Get_batch.""" start = (page - 1) * per_page end = start + per_page return items[start:end] - def add_spec(self, spec: ProcessModelInfo): + def add_spec(self, spec: ProcessModelInfo) -> None: """Add_spec.""" display_order = self.next_display_order(spec) spec.display_order = display_order self.update_spec(spec) - def update_spec(self, spec: ProcessModelInfo): + def update_spec(self, spec: ProcessModelInfo) -> None: """Update_spec.""" spec_path = self.workflow_path(spec) if spec.is_master_spec or spec.library or spec.standalone: @@ -49,7 +49,7 @@ class ProcessModelService(FileSystemService): with open(json_path, "w") as wf_json: json.dump(self.WF_SCHEMA.dump(spec), wf_json, indent=4) - def process_model_delete(self, process_model_id: str): + def process_model_delete(self, process_model_id: str) -> None: """Delete Procecss Model.""" instances = ProcessInstanceModel.query.filter( ProcessInstanceModel.process_model_identifier == process_model_id @@ -79,7 +79,7 @@ class ProcessModelService(FileSystemService): """Master_spec.""" return self.get_master_spec() - def get_master_spec(self): + def get_master_spec(self) -> None: """Get_master_spec.""" path = os.path.join( FileSystemService.root_path(), FileSystemService.MASTER_SPECIFICATION @@ -87,7 +87,7 @@ class ProcessModelService(FileSystemService): if os.path.exists(path): return self.__scan_spec(path, FileSystemService.MASTER_SPECIFICATION) - def get_process_model(self, process_model_id, group_id=None): + def get_process_model(self, process_model_id: str, group_id: Optional[str]=None) -> Optional[ProcessModelInfo]: """Get a process model from a model and group id.""" if not os.path.exists(FileSystemService.root_path()): return # Nothing to scan yet. There are no files. @@ -112,7 +112,7 @@ class ProcessModelService(FileSystemService): ) return self.__scan_spec(sd.path, sd.name, process_group) - def get_process_models(self, process_group_id=None): + def get_process_models(self, process_group_id: Optional[str]=None) -> List[ProcessModelInfo]: """Get process models.""" if process_group_id is None: process_groups = self.get_process_groups() @@ -143,7 +143,7 @@ class ProcessModelService(FileSystemService): ) return self.cleanup_workflow_spec_display_order(spec.process_group) - def cleanup_workflow_spec_display_order(self, process_group): + def cleanup_workflow_spec_display_order(self, process_group: ProcessGroup) -> List[Union[Any, ProcessModelInfo]]: """Cleanup_workflow_spec_display_order.""" index = 0 if not process_group: @@ -174,7 +174,7 @@ class ProcessModelService(FileSystemService): return [] return process_group.process_models - def get_process_group(self, process_group_id): + def get_process_group(self, process_group_id: str) -> Optional[ProcessGroup]: """Look for a given process_group, and return it.""" if not os.path.exists(FileSystemService.root_path()): return # Nothing to scan yet. There are no files. @@ -183,13 +183,13 @@ class ProcessModelService(FileSystemService): if item.is_dir() and item.name == process_group_id: return self.__scan_process_group(item) - def add_process_group(self, process_group: ProcessGroup): + def add_process_group(self, process_group: ProcessGroup) -> ProcessGroup: """Add_process_group.""" display_order = len(self.get_process_groups()) process_group.display_order = display_order return self.update_process_group(process_group) - def update_process_group(self, process_group: ProcessGroup): + def update_process_group(self, process_group: ProcessGroup) -> ProcessGroup: """Update_process_group.""" cat_path = self.process_group_path(process_group.id) os.makedirs(cat_path, exist_ok=True) @@ -198,7 +198,7 @@ class ProcessModelService(FileSystemService): json.dump(self.GROUP_SCHEMA.dump(process_group), cat_json, indent=4) return process_group - def process_group_delete(self, process_group_id: str): + def process_group_delete(self, process_group_id: str) -> None: """Delete_process_group.""" path = self.process_group_path(process_group_id) if os.path.exists(path): @@ -228,7 +228,7 @@ class ProcessModelService(FileSystemService): index += 1 return process_groups - def cleanup_process_group_display_order(self): + def cleanup_process_group_display_order(self) -> List[Any]: """Cleanup_process_group_display_order.""" process_groups = self.get_process_groups() # Returns an ordered list index = 0 diff --git a/src/spiffworkflow_backend/services/spec_file_service.py b/src/spiffworkflow_backend/services/spec_file_service.py index f96fb11c..1ebc0a61 100644 --- a/src/spiffworkflow_backend/services/spec_file_service.py +++ b/src/spiffworkflow_backend/services/spec_file_service.py @@ -2,12 +2,12 @@ import os import shutil from datetime import datetime -from typing import List +from typing import Optional, List from typing import Union from flask_bpmn.api.api_error import ApiError from lxml import etree # type: ignore -from lxml.etree import Element as EtreeElement # type: ignore +from lxml.etree import _Element, Element as EtreeElement # type: ignore from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore from spiffworkflow_backend.models.file import File @@ -27,9 +27,9 @@ class SpecFileService(FileSystemService): @staticmethod def get_files( workflow_spec: ProcessModelInfo, - file_name=None, - include_libraries=False, - extension_filter="", + file_name: Optional[str]=None, + include_libraries: bool=False, + extension_filter: str="", ) -> List[File]: """Return all files associated with a workflow specification.""" path = SpecFileService.workflow_path(workflow_spec) @@ -56,7 +56,7 @@ class SpecFileService(FileSystemService): @staticmethod def update_file( - workflow_spec: ProcessModelInfo, file_name: str, binary_data + workflow_spec: ProcessModelInfo, file_name: str, binary_data: bytes ) -> File: """Update_file.""" SpecFileService.assert_valid_file_name(file_name) @@ -169,7 +169,7 @@ class SpecFileService(FileSystemService): ) @staticmethod - def has_swimlane(et_root: EtreeElement) -> bool: + def has_swimlane(et_root: _Element) -> bool: """Look through XML and determine if there are any lanes present that have a label.""" elements = et_root.xpath( "//bpmn:lane", @@ -182,7 +182,7 @@ class SpecFileService(FileSystemService): return retval @staticmethod - def get_process_id(et_root: EtreeElement) -> str: + def get_process_id(et_root: _Element) -> str: """Get_process_id.""" process_elements = [] for child in et_root: diff --git a/tests/spiffworkflow_backend/integration/test_process_api.py b/tests/spiffworkflow_backend/integration/test_process_api.py index c1a711fa..0254623b 100644 --- a/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/tests/spiffworkflow_backend/integration/test_process_api.py @@ -8,7 +8,6 @@ from typing import Iterator from typing import Optional from typing import Union -import pytest from flask.app import Flask from flask.testing import FlaskClient from flask_bpmn.models.db import db @@ -26,17 +25,6 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema from spiffworkflow_backend.services.process_model_service import ProcessModelService -@pytest.fixture() -def with_bpmn_file_cleanup() -> Iterator[None]: - """Process_group_resource.""" - try: - yield - finally: - process_model_service = ProcessModelService() - if os.path.exists(process_model_service.root_path()): - shutil.rmtree(process_model_service.root_path()) - - # phase 1: req_id: 7.1 Deploy process def test_process_model_add( app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None diff --git a/tests/spiffworkflow_backend/unit/test_process_group.py b/tests/spiffworkflow_backend/unit/test_process_group.py index 7e5b7a09..d4a590b4 100644 --- a/tests/spiffworkflow_backend/unit/test_process_group.py +++ b/tests/spiffworkflow_backend/unit/test_process_group.py @@ -5,7 +5,7 @@ from spiffworkflow_backend.models.process_group import ProcessGroup from spiffworkflow_backend.services.process_model_service import ProcessModelService -def test_there_is_at_least_one_group_after_we_create_one(app: Flask) -> None: +def test_there_is_at_least_one_group_after_we_create_one(app: Flask, with_bpmn_file_cleanup: None) -> None: """Test_there_is_at_least_one_group_after_we_create_one.""" process_model_service = ProcessModelService() process_group = ProcessGroup(id="hey", display_name="sure")