diff --git a/src/spiffworkflow_backend/models/file.py b/src/spiffworkflow_backend/models/file.py index ca2ab071..0d1dad6c 100644 --- a/src/spiffworkflow_backend/models/file.py +++ b/src/spiffworkflow_backend/models/file.py @@ -3,7 +3,7 @@ import enum from dataclasses import dataclass from dataclasses import field from datetime import datetime -from typing import Optional +from typing import Any, Optional from flask_bpmn.models.db import db from flask_bpmn.models.db import SpiffworkflowBaseDBModel @@ -64,6 +64,10 @@ class FileType(enum.Enum): xml = "xml" zip = "zip" + @classmethod + def list(cls) -> list[str]: + return [el.value for el in cls] + CONTENT_TYPES = { "bpmn": "text/xml", diff --git a/src/spiffworkflow_backend/models/process_group.py b/src/spiffworkflow_backend/models/process_group.py index 67c75579..ac6950ba 100644 --- a/src/spiffworkflow_backend/models/process_group.py +++ b/src/spiffworkflow_backend/models/process_group.py @@ -1,7 +1,9 @@ """Process_group.""" +from __future__ import annotations + from dataclasses import dataclass from dataclasses import field -from typing import Dict +from typing import Any, Dict from typing import Optional from typing import Union @@ -22,13 +24,13 @@ class ProcessGroup: display_name: str display_order: Optional[int] = 0 admin: Optional[bool] = False - process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list) + process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list[ProcessModelInfo]) def __post_init__(self) -> None: """__post_init__.""" self.sort_index = f"{self.display_order}:{self.id}" - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: """__eq__.""" if not isinstance(other, ProcessGroup): return False @@ -54,7 +56,7 @@ class ProcessGroupSchema(Schema): @post_load def make_process_group( - self, data: Dict[str, Union[str, bool, int]], **kwargs + self, data: Dict[str, Union[str, bool, int]], **kwargs: dict ) -> ProcessGroup: """Make_process_group.""" - return ProcessGroup(**data) + return ProcessGroup(**data) # type: ignore diff --git a/src/spiffworkflow_backend/models/process_instance.py b/src/spiffworkflow_backend/models/process_instance.py index ce5f5eb1..9a94cd1e 100644 --- a/src/spiffworkflow_backend/models/process_instance.py +++ b/src/spiffworkflow_backend/models/process_instance.py @@ -1,6 +1,9 @@ """Process_instance.""" +from __future__ import annotations + +from dataclasses import dataclass import enum -from typing import Dict +from typing import Any, Dict, Optional from typing import Union import marshmallow @@ -49,7 +52,7 @@ class NavigationItemSchema(Schema): ) @marshmallow.post_load - def make_nav(self, data, **kwargs): + def make_nav(self, data: dict[str, Any], **kwargs: dict) -> NavItem: """Make_nav.""" state = data.pop("state", None) task_id = data.pop("task_id", None) @@ -165,7 +168,7 @@ class ProcessInstanceApiSchema(Schema): state = marshmallow.fields.String(allow_none=True) @marshmallow.post_load - def make_process_instance(self, data, **kwargs): + def make_process_instance(self, data: dict[str, Any], **kwargs: dict) -> ProcessInstanceApi: """Make_process_instance.""" keys = [ "id", @@ -187,59 +190,38 @@ class ProcessInstanceApiSchema(Schema): return ProcessInstanceApi(**filtered_fields) +@dataclass class ProcessInstanceMetadata: """ProcessInstanceMetadata.""" - def __init__( - self, - id, - display_name=None, - description=None, - spec_version=None, - category_id=None, - category_display_name=None, - state=None, - status: ProcessInstanceStatus = None, - total_tasks=None, - completed_tasks=None, - is_review=None, - display_order=None, - state_message=None, - process_model_identifier=None, - ): - """__init__.""" - self.id = id - self.display_name = display_name - self.description = description - self.spec_version = spec_version - self.category_id = category_id - self.category_display_name = category_display_name - self.state = state - self.state_message = state_message - self.status = status - self.total_tasks = total_tasks - self.completed_tasks = completed_tasks - self.is_review = is_review - self.display_order = display_order - self.process_model_identifier = process_model_identifier + id: int + display_name: Optional[str] = None + description: Optional[str] = None + spec_version: Optional[str] = None + state: Optional[str] = None + status: Optional[ProcessInstanceStatus] = None + total_tasks: Optional[int] = None + completed_tasks: Optional[int] = None + is_review: Optional[bool] = None + state_message: Optional[str] = None + process_model_identifier: Optional[str] = None + process_group_id: Optional[str] = None @classmethod def from_process_instance( - cls, process_instance: ProcessInstanceModel, spec: ProcessModelInfo - ): + cls, process_instance: ProcessInstanceModel, process_model: ProcessModelInfo + ) -> ProcessInstanceMetadata: """From_process_instance.""" instance = cls( id=process_instance.id, - display_name=spec.display_name, - description=spec.description, - category_id=spec.category_id, - category_display_name=spec.category.display_name, + display_name=process_model.display_name, + description=process_model.description, + process_group_id=process_model.process_group_id, state_message=process_instance.state_message, status=process_instance.status, total_tasks=process_instance.total_tasks, completed_tasks=process_instance.completed_tasks, - is_review=spec.is_review, - display_order=spec.display_order, + is_review=process_model.is_review, process_model_identifier=process_instance.process_model_identifier, ) return instance @@ -261,10 +243,8 @@ class ProcessInstanceMetadataSchema(Schema): "state", "total_tasks", "completed_tasks", - "display_order", - "category_id", + "process_group_id", "is_review", - "category_display_name", "state_message", ] unknown = INCLUDE diff --git a/src/spiffworkflow_backend/models/process_instance_report.py b/src/spiffworkflow_backend/models/process_instance_report.py index 7bcb2cf3..b6cfbf4d 100644 --- a/src/spiffworkflow_backend/models/process_instance_report.py +++ b/src/spiffworkflow_backend/models/process_instance_report.py @@ -1,4 +1,5 @@ """Process_instance.""" +from typing import Union from flask_bpmn.models.db import db from flask_bpmn.models.db import SpiffworkflowBaseDBModel from sqlalchemy import ForeignKey @@ -22,7 +23,7 @@ class ProcessInstanceReportModel(SpiffworkflowBaseDBModel): updated_at_in_seconds = db.Column(db.Integer) # type: ignore @property - def serialized(self): + def serialized(self) -> dict[str, Union[str, int]]: """Return object data in serializeable format.""" return { "id": self.id, diff --git a/src/spiffworkflow_backend/models/process_model.py b/src/spiffworkflow_backend/models/process_model.py index 82ec0589..1aae6bbb 100644 --- a/src/spiffworkflow_backend/models/process_model.py +++ b/src/spiffworkflow_backend/models/process_model.py @@ -4,7 +4,7 @@ from __future__ import annotations import enum from dataclasses import dataclass from dataclasses import field -from typing import Any +from typing import Any, Optional import marshmallow from marshmallow import Schema @@ -30,6 +30,7 @@ class ProcessModelInfo: display_name: str description: str process_group_id: str = "" + process_group: Optional[Any] = None is_master_spec: bool | None = False standalone: bool | None = False library: bool | None = False @@ -80,6 +81,6 @@ class ProcessModelInfoSchema(Schema): notification_email_on_exception = marshmallow.fields.List(marshmallow.fields.String) @post_load - def make_spec(self, data: dict[str, str | bool | int], **_) -> ProcessModelInfo: + def make_spec(self, data: dict[str, str | bool | int | NotificationType], **_: Any) -> ProcessModelInfo: """Make_spec.""" - return ProcessModelInfo(**data) + return ProcessModelInfo(**data) # type: ignore diff --git a/src/spiffworkflow_backend/models/task.py b/src/spiffworkflow_backend/models/task.py index 0893bdfa..deda5d92 100644 --- a/src/spiffworkflow_backend/models/task.py +++ b/src/spiffworkflow_backend/models/task.py @@ -1,5 +1,6 @@ """Task.""" import enum +from typing import Any import marshmallow from marshmallow import Schema @@ -92,7 +93,7 @@ class Task: lane: str, form: str, documentation: str, - data: str, + data: dict[str, Any], multi_instance_type: str, multi_instance_count: str, multi_instance_index: str, @@ -234,8 +235,7 @@ class TaskSchema(Schema): process_name = marshmallow.fields.String(required=False, allow_none=True) lane = marshmallow.fields.String(required=False, allow_none=True) - # TODO: implement - # @marshmallow.post_load - # def make_task(self, data: list[str], **_): - # """Make_task.""" - # return Task(**data) + @marshmallow.post_load + def make_task(self, data: dict[str, Any], **kwargs: dict) -> Task: + """Make_task.""" + return Task(**data) diff --git a/src/spiffworkflow_backend/models/user.py b/src/spiffworkflow_backend/models/user.py index cfe433fd..07fed48b 100644 --- a/src/spiffworkflow_backend/models/user.py +++ b/src/spiffworkflow_backend/models/user.py @@ -24,7 +24,7 @@ class UserModel(SpiffworkflowBaseDBModel): name = db.Column(db.String(50)) # type: ignore email = db.Column(db.String(50)) # type: ignore user_group_assignments = relationship(UserGroupAssignmentModel, cascade="delete") - groups = relationship( + groups = relationship( # type: ignore GroupModel, viewonly=True, secondary="user_group_assignment", diff --git a/src/spiffworkflow_backend/routes/user.py b/src/spiffworkflow_backend/routes/user.py index a35825c4..214dada1 100644 --- a/src/spiffworkflow_backend/routes/user.py +++ b/src/spiffworkflow_backend/routes/user.py @@ -14,7 +14,7 @@ from spiffworkflow_backend.models.user import UserModel """ -def verify_token(token: Optional[str] = None) -> Dict[str, None]: +def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]: """Verify the token for the user (if provided). If in production environment and token is not provided, gets user from the SSO headers and returns their token. @@ -60,9 +60,9 @@ def verify_token(token: Optional[str] = None) -> Dict[str, None]: # If the user is valid, store the user and token for this session if db_user is not None: g.user = db_user - token = g.user.encode_auth_token() - g.token = token - token_info = UserModel.decode_auth_token(token) + token_from_user = g.user.encode_auth_token() + g.token = token_from_user + token_info = UserModel.decode_auth_token(token_from_user) return token_info else: @@ -80,8 +80,8 @@ def verify_token(token: Optional[str] = None) -> Dict[str, None]: "no_user", "You are in development mode, but there are no users in the database. Add one, and it will use it.", ) - token = g.user.encode_auth_token() - token_info = UserModel.decode_auth_token(token) + token_from_user = g.user.encode_auth_token() + token_info = UserModel.decode_auth_token(token_from_user) return token_info diff --git a/src/spiffworkflow_backend/scripts/script.py b/src/spiffworkflow_backend/scripts/script.py index 113516bc..585392d7 100644 --- a/src/spiffworkflow_backend/scripts/script.py +++ b/src/spiffworkflow_backend/scripts/script.py @@ -1,10 +1,16 @@ """Script.""" +from __future__ import annotations + +from abc import abstractmethod import importlib import os import pkgutil +from typing import Any, Callable, Type from flask_bpmn.api.api_error import ApiError +from spiffworkflow_backend.models.task import Task + # Generally speaking, having some global in a flask app is TERRIBLE. # This is here, because after loading the application this will never change under @@ -15,11 +21,12 @@ SCRIPT_SUB_CLASSES = None class Script: """Provides an abstract class that defines how scripts should work, this must be extended in all Script Tasks.""" - def get_description(self): + def get_description(self) -> None: """Get_description.""" raise ApiError("invalid_script", "This script does not supply a description.") - def do_task(self, task, workflow_id, *args, **kwargs): + @abstractmethod + def do_task(self, task: Task, workflow_id: int, *args: list[Any], **kwargs: dict[Any, Any]) -> None: """Do_task.""" raise ApiError( "invalid_script", @@ -28,7 +35,8 @@ class Script: + "does not properly implement the do_task function.", ) - def do_task_validate_only(self, task, workflow_id, *args, **kwargs): + @abstractmethod + def do_task_validate_only(self, task: Task, workflow_id: int, *args: list[Any], **kwargs: dict[Any, Any]) -> None: """Do_task_validate_only.""" raise ApiError( "invalid_script", @@ -39,7 +47,7 @@ class Script: ) @staticmethod - def generate_augmented_list(task, workflow_id): + def generate_augmented_list(task: Task, workflow_id: int) -> dict[str, Callable]: """This makes a dictionary of lambda functions that are closed over the class instance that they represent. This is passed into PythonScriptParser as a list of helper functions that are @@ -50,7 +58,7 @@ class Script: updating the task data. """ - def make_closure(subclass, task, workflow_id): + def make_closure(subclass: Type[Script], task: Task, workflow_id: int) -> Callable: """Yes - this is black magic. Essentially, we want to build a list of all of the submodules (i.e. email, user_data_get, etc) @@ -75,7 +83,7 @@ class Script: return execlist @staticmethod - def generate_augmented_validate_list(task, workflow_id): + def generate_augmented_validate_list(task: Task, workflow_id: int) -> dict[str, Callable]: """This makes a dictionary of lambda functions that are closed over the class instance that they represent. This is passed into PythonScriptParser as a list of helper functions that are @@ -86,7 +94,7 @@ class Script: updating the task data. """ - def make_closure_validate(subclass, task, workflow_id): + def make_closure_validate(subclass: Type[Script], task: Task, workflow_id: int) -> Callable: """Make_closure_validate.""" instance = subclass() return lambda *a, **b: subclass.do_task_validate_only( @@ -103,7 +111,7 @@ class Script: return execlist @classmethod - def get_all_subclasses(cls): + def get_all_subclasses(cls) -> list[Type[Script]]: """Get_all_subclasses.""" # This is expensive to generate, never changes after we load up. global SCRIPT_SUB_CLASSES @@ -112,7 +120,7 @@ class Script: return SCRIPT_SUB_CLASSES @staticmethod - def _get_all_subclasses(cls): + def _get_all_subclasses(script_class: Any) -> list[Type[Script]]: """_get_all_subclasses.""" # hackish mess to make sure we have all the modules loaded for the scripts pkg_dir = os.path.dirname(__file__) @@ -122,13 +130,13 @@ class Script: """Returns a list of all classes that extend this class.""" all_subclasses = [] - for subclass in cls.__subclasses__(): + for subclass in script_class.__subclasses__(): all_subclasses.append(subclass) all_subclasses.extend(Script._get_all_subclasses(subclass)) return all_subclasses - def add_data_to_task(self, task, data): + def add_data_to_task(self, task: Task, data: Any) -> None: """Add_data_to_task.""" key = self.__class__.__name__ if key in task.data: @@ -140,12 +148,12 @@ class Script: class ScriptValidationError: """ScriptValidationError.""" - def __init__(self, code, message): + def __init__(self, code: str, message: str): """__init__.""" self.code = code self.message = message @classmethod - def from_api_error(cls, api_error: ApiError): + def from_api_error(cls, api_error: ApiError) -> ScriptValidationError: """From_api_error.""" return cls(api_error.code, api_error.message) diff --git a/src/spiffworkflow_backend/services/file_system_service.py b/src/spiffworkflow_backend/services/file_system_service.py index 39a1d0e3..abc51fd1 100644 --- a/src/spiffworkflow_backend/services/file_system_service.py +++ b/src/spiffworkflow_backend/services/file_system_service.py @@ -103,7 +103,7 @@ class FileSystemService: def assert_valid_file_name(file_name: str) -> None: """Assert_valid_file_name.""" file_extension = FileSystemService.get_extension(file_name) - if file_extension not in FileType._member_names_: + if file_extension not in FileType.list(): raise ApiError( "unknown_extension", "The file you provided does not have an accepted extension:" diff --git a/src/spiffworkflow_backend/services/process_model_service.py b/src/spiffworkflow_backend/services/process_model_service.py index b0a5609c..3a7f9326 100644 --- a/src/spiffworkflow_backend/services/process_model_service.py +++ b/src/spiffworkflow_backend/services/process_model_service.py @@ -74,7 +74,7 @@ class ProcessModelService(FileSystemService): path = self.workflow_path(process_model) shutil.rmtree(path) - def __remove_library_references(self, spec_id): + def __remove_library_references(self, spec_id: str) -> None: """__remove_library_references.""" for process_model in self.get_process_models(): if spec_id in process_model.libraries: @@ -82,7 +82,7 @@ class ProcessModelService(FileSystemService): self.update_spec(process_model) @property - def master_spec(self): + def master_spec(self) -> Optional[ProcessModelInfo]: """Master_spec.""" return self.get_master_spec() @@ -309,7 +309,7 @@ class ProcessModelService(FileSystemService): # workflow_metas.append(WorkflowMetadata.from_workflow(workflow)) return workflow_metas - def __scan_spec(self, path, name, process_group=None): + def __scan_spec(self, path: str, name: str, process_group: Optional[ProcessGroup] = None) -> ProcessModelInfo: """__scan_spec.""" spec_path = os.path.join(path, self.WF_JSON_FILE) is_master = FileSystemService.MASTER_SPECIFICATION in spec_path @@ -318,6 +318,11 @@ class ProcessModelService(FileSystemService): with open(spec_path) as wf_json: data = json.load(wf_json) spec = self.WF_SCHEMA.load(data) + if spec is None: + raise ApiError( + code="process_model_could_not_be_loaded_from_disk", + message=f"We could not load the process_model from disk with data: {data}", + ) else: spec = ProcessModelInfo( id=name, diff --git a/src/spiffworkflow_backend/services/spec_file_service.py b/src/spiffworkflow_backend/services/spec_file_service.py index ac4c3efa..0b080663 100644 --- a/src/spiffworkflow_backend/services/spec_file_service.py +++ b/src/spiffworkflow_backend/services/spec_file_service.py @@ -67,7 +67,7 @@ class SpecFileService(FileSystemService): file = SpecFileService.to_file_object(file_name, file_path) if file_name == workflow_spec.primary_file_name: SpecFileService.set_primary_bpmn(workflow_spec, file_name, binary_data) - elif workflow_spec.primary_file_name is None and file.type == FileType.bpmn: + elif workflow_spec.primary_file_name is None and file.type == str(FileType.bpmn): # If no primary process exists, make this pirmary process. SpecFileService.set_primary_bpmn(workflow_spec, file_name, binary_data) return file diff --git a/src/spiffworkflow_backend/services/user_service.py b/src/spiffworkflow_backend/services/user_service.py index 5c903943..0ff9200e 100644 --- a/src/spiffworkflow_backend/services/user_service.py +++ b/src/spiffworkflow_backend/services/user_service.py @@ -1,4 +1,5 @@ """User_service.""" +from typing import Any, Optional from flask import g from flask_bpmn.api.api_error import ApiError from flask_bpmn.models.db import db @@ -27,7 +28,7 @@ class UserService: def admin_is_impersonating() -> bool: """Admin_is_impersonating.""" if UserService.user_is_admin(): - admin_session: AdminSessionModel = UserService.get_admin_session() + admin_session = UserService.get_admin_session() return admin_session is not None else: @@ -39,12 +40,12 @@ class UserService: # Returns true if the given user uid is different from the current user's uid. @staticmethod - def is_different_user(uid): + def is_different_user(uid: str) -> bool: """Is_different_user.""" return UserService.has_user() and uid is not None and uid is not g.user.uid @staticmethod - def current_user(allow_admin_impersonate: bool = False) -> UserModel: + def current_user(allow_admin_impersonate: bool = False) -> Any: """Current_user.""" if not UserService.has_user(): raise ApiError( @@ -66,7 +67,7 @@ class UserService: # This method allows an admin user to start impersonating another user with the given uid. # Stops impersonating if the uid is None or invalid. @staticmethod - def start_impersonating(uid=None): + def start_impersonating(uid: Optional[str] = None) -> None: """Start_impersonating.""" if not UserService.has_user(): raise ApiError( @@ -104,7 +105,7 @@ class UserService: raise ApiError("invalid_uid", "The uid provided is not valid.") @staticmethod - def stop_impersonating(): + def stop_impersonating() -> None: """Stop_impersonating.""" if not UserService.has_user(): raise ApiError( @@ -115,13 +116,13 @@ class UserService: if "impersonate_user" in g: del g.impersonate_user - admin_session: AdminSessionModel = UserService.get_admin_session() + admin_session = UserService.get_admin_session() if admin_session: db.session.delete(admin_session) db.session.commit() @staticmethod - def in_list(uids, allow_admin_impersonate=False): + def in_list(uids: list[str], allow_admin_impersonate: bool = False) -> bool: """Returns true if the current user's id is in the given list of ids. False if there is no user, or the user is not in the list. @@ -135,7 +136,7 @@ class UserService: return False @staticmethod - def get_admin_session() -> AdminSessionModel: + def get_admin_session() -> Any: """Get_admin_session.""" if UserService.user_is_admin(): return ( @@ -151,7 +152,7 @@ class UserService: ) @staticmethod - def get_admin_session_user() -> UserModel: + def get_admin_session_user() -> Any: """Get_admin_session_user.""" if UserService.user_is_admin(): admin_session = UserService.get_admin_session()