some updates for types w/ burnettk

This commit is contained in:
jasquat 2022-06-22 15:08:01 -04:00
parent f78a2b6997
commit 2a43b5ae91
13 changed files with 98 additions and 96 deletions

View File

@ -3,7 +3,7 @@ import enum
from dataclasses import dataclass
from dataclasses import field
from datetime import datetime
from typing import Optional
from typing import Any, Optional
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
@ -64,6 +64,10 @@ class FileType(enum.Enum):
xml = "xml"
zip = "zip"
@classmethod
def list(cls) -> list[str]:
return [el.value for el in cls]
CONTENT_TYPES = {
"bpmn": "text/xml",

View File

@ -1,7 +1,9 @@
"""Process_group."""
from __future__ import annotations
from dataclasses import dataclass
from dataclasses import field
from typing import Dict
from typing import Any, Dict
from typing import Optional
from typing import Union
@ -22,13 +24,13 @@ class ProcessGroup:
display_name: str
display_order: Optional[int] = 0
admin: Optional[bool] = False
process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list)
process_models: Optional[list[ProcessModelInfo]] = field(default_factory=list[ProcessModelInfo])
def __post_init__(self) -> None:
"""__post_init__."""
self.sort_index = f"{self.display_order}:{self.id}"
def __eq__(self, other):
def __eq__(self, other: Any) -> bool:
"""__eq__."""
if not isinstance(other, ProcessGroup):
return False
@ -54,7 +56,7 @@ class ProcessGroupSchema(Schema):
@post_load
def make_process_group(
self, data: Dict[str, Union[str, bool, int]], **kwargs
self, data: Dict[str, Union[str, bool, int]], **kwargs: dict
) -> ProcessGroup:
"""Make_process_group."""
return ProcessGroup(**data)
return ProcessGroup(**data) # type: ignore

View File

@ -1,6 +1,9 @@
"""Process_instance."""
from __future__ import annotations
from dataclasses import dataclass
import enum
from typing import Dict
from typing import Any, Dict, Optional
from typing import Union
import marshmallow
@ -49,7 +52,7 @@ class NavigationItemSchema(Schema):
)
@marshmallow.post_load
def make_nav(self, data, **kwargs):
def make_nav(self, data: dict[str, Any], **kwargs: dict) -> NavItem:
"""Make_nav."""
state = data.pop("state", None)
task_id = data.pop("task_id", None)
@ -165,7 +168,7 @@ class ProcessInstanceApiSchema(Schema):
state = marshmallow.fields.String(allow_none=True)
@marshmallow.post_load
def make_process_instance(self, data, **kwargs):
def make_process_instance(self, data: dict[str, Any], **kwargs: dict) -> ProcessInstanceApi:
"""Make_process_instance."""
keys = [
"id",
@ -187,59 +190,38 @@ class ProcessInstanceApiSchema(Schema):
return ProcessInstanceApi(**filtered_fields)
@dataclass
class ProcessInstanceMetadata:
"""ProcessInstanceMetadata."""
def __init__(
self,
id,
display_name=None,
description=None,
spec_version=None,
category_id=None,
category_display_name=None,
state=None,
status: ProcessInstanceStatus = None,
total_tasks=None,
completed_tasks=None,
is_review=None,
display_order=None,
state_message=None,
process_model_identifier=None,
):
"""__init__."""
self.id = id
self.display_name = display_name
self.description = description
self.spec_version = spec_version
self.category_id = category_id
self.category_display_name = category_display_name
self.state = state
self.state_message = state_message
self.status = status
self.total_tasks = total_tasks
self.completed_tasks = completed_tasks
self.is_review = is_review
self.display_order = display_order
self.process_model_identifier = process_model_identifier
id: int
display_name: Optional[str] = None
description: Optional[str] = None
spec_version: Optional[str] = None
state: Optional[str] = None
status: Optional[ProcessInstanceStatus] = None
total_tasks: Optional[int] = None
completed_tasks: Optional[int] = None
is_review: Optional[bool] = None
state_message: Optional[str] = None
process_model_identifier: Optional[str] = None
process_group_id: Optional[str] = None
@classmethod
def from_process_instance(
cls, process_instance: ProcessInstanceModel, spec: ProcessModelInfo
):
cls, process_instance: ProcessInstanceModel, process_model: ProcessModelInfo
) -> ProcessInstanceMetadata:
"""From_process_instance."""
instance = cls(
id=process_instance.id,
display_name=spec.display_name,
description=spec.description,
category_id=spec.category_id,
category_display_name=spec.category.display_name,
display_name=process_model.display_name,
description=process_model.description,
process_group_id=process_model.process_group_id,
state_message=process_instance.state_message,
status=process_instance.status,
total_tasks=process_instance.total_tasks,
completed_tasks=process_instance.completed_tasks,
is_review=spec.is_review,
display_order=spec.display_order,
is_review=process_model.is_review,
process_model_identifier=process_instance.process_model_identifier,
)
return instance
@ -261,10 +243,8 @@ class ProcessInstanceMetadataSchema(Schema):
"state",
"total_tasks",
"completed_tasks",
"display_order",
"category_id",
"process_group_id",
"is_review",
"category_display_name",
"state_message",
]
unknown = INCLUDE

View File

@ -1,4 +1,5 @@
"""Process_instance."""
from typing import Union
from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
@ -22,7 +23,7 @@ class ProcessInstanceReportModel(SpiffworkflowBaseDBModel):
updated_at_in_seconds = db.Column(db.Integer) # type: ignore
@property
def serialized(self):
def serialized(self) -> dict[str, Union[str, int]]:
"""Return object data in serializeable format."""
return {
"id": self.id,

View File

@ -4,7 +4,7 @@ from __future__ import annotations
import enum
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import Any, Optional
import marshmallow
from marshmallow import Schema
@ -30,6 +30,7 @@ class ProcessModelInfo:
display_name: str
description: str
process_group_id: str = ""
process_group: Optional[Any] = None
is_master_spec: bool | None = False
standalone: bool | None = False
library: bool | None = False
@ -80,6 +81,6 @@ class ProcessModelInfoSchema(Schema):
notification_email_on_exception = marshmallow.fields.List(marshmallow.fields.String)
@post_load
def make_spec(self, data: dict[str, str | bool | int], **_) -> ProcessModelInfo:
def make_spec(self, data: dict[str, str | bool | int | NotificationType], **_: Any) -> ProcessModelInfo:
"""Make_spec."""
return ProcessModelInfo(**data)
return ProcessModelInfo(**data) # type: ignore

View File

@ -1,5 +1,6 @@
"""Task."""
import enum
from typing import Any
import marshmallow
from marshmallow import Schema
@ -92,7 +93,7 @@ class Task:
lane: str,
form: str,
documentation: str,
data: str,
data: dict[str, Any],
multi_instance_type: str,
multi_instance_count: str,
multi_instance_index: str,
@ -234,8 +235,7 @@ class TaskSchema(Schema):
process_name = marshmallow.fields.String(required=False, allow_none=True)
lane = marshmallow.fields.String(required=False, allow_none=True)
# TODO: implement
# @marshmallow.post_load
# def make_task(self, data: list[str], **_):
# """Make_task."""
# return Task(**data)
@marshmallow.post_load
def make_task(self, data: dict[str, Any], **kwargs: dict) -> Task:
"""Make_task."""
return Task(**data)

View File

@ -24,7 +24,7 @@ class UserModel(SpiffworkflowBaseDBModel):
name = db.Column(db.String(50)) # type: ignore
email = db.Column(db.String(50)) # type: ignore
user_group_assignments = relationship(UserGroupAssignmentModel, cascade="delete")
groups = relationship(
groups = relationship( # type: ignore
GroupModel,
viewonly=True,
secondary="user_group_assignment",

View File

@ -14,7 +14,7 @@ from spiffworkflow_backend.models.user import UserModel
"""
def verify_token(token: Optional[str] = None) -> Dict[str, None]:
def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
"""Verify the token for the user (if provided).
If in production environment and token is not provided, gets user from the SSO headers and returns their token.
@ -60,9 +60,9 @@ def verify_token(token: Optional[str] = None) -> Dict[str, None]:
# If the user is valid, store the user and token for this session
if db_user is not None:
g.user = db_user
token = g.user.encode_auth_token()
g.token = token
token_info = UserModel.decode_auth_token(token)
token_from_user = g.user.encode_auth_token()
g.token = token_from_user
token_info = UserModel.decode_auth_token(token_from_user)
return token_info
else:
@ -80,8 +80,8 @@ def verify_token(token: Optional[str] = None) -> Dict[str, None]:
"no_user",
"You are in development mode, but there are no users in the database. Add one, and it will use it.",
)
token = g.user.encode_auth_token()
token_info = UserModel.decode_auth_token(token)
token_from_user = g.user.encode_auth_token()
token_info = UserModel.decode_auth_token(token_from_user)
return token_info

View File

@ -1,10 +1,16 @@
"""Script."""
from __future__ import annotations
from abc import abstractmethod
import importlib
import os
import pkgutil
from typing import Any, Callable, Type
from flask_bpmn.api.api_error import ApiError
from spiffworkflow_backend.models.task import Task
# Generally speaking, having some global in a flask app is TERRIBLE.
# This is here, because after loading the application this will never change under
@ -15,11 +21,12 @@ SCRIPT_SUB_CLASSES = None
class Script:
"""Provides an abstract class that defines how scripts should work, this must be extended in all Script Tasks."""
def get_description(self):
def get_description(self) -> None:
"""Get_description."""
raise ApiError("invalid_script", "This script does not supply a description.")
def do_task(self, task, workflow_id, *args, **kwargs):
@abstractmethod
def do_task(self, task: Task, workflow_id: int, *args: list[Any], **kwargs: dict[Any, Any]) -> None:
"""Do_task."""
raise ApiError(
"invalid_script",
@ -28,7 +35,8 @@ class Script:
+ "does not properly implement the do_task function.",
)
def do_task_validate_only(self, task, workflow_id, *args, **kwargs):
@abstractmethod
def do_task_validate_only(self, task: Task, workflow_id: int, *args: list[Any], **kwargs: dict[Any, Any]) -> None:
"""Do_task_validate_only."""
raise ApiError(
"invalid_script",
@ -39,7 +47,7 @@ class Script:
)
@staticmethod
def generate_augmented_list(task, workflow_id):
def generate_augmented_list(task: Task, workflow_id: int) -> dict[str, Callable]:
"""This makes a dictionary of lambda functions that are closed over the class instance that they represent.
This is passed into PythonScriptParser as a list of helper functions that are
@ -50,7 +58,7 @@ class Script:
updating the task data.
"""
def make_closure(subclass, task, workflow_id):
def make_closure(subclass: Type[Script], task: Task, workflow_id: int) -> Callable:
"""Yes - this is black magic.
Essentially, we want to build a list of all of the submodules (i.e. email, user_data_get, etc)
@ -75,7 +83,7 @@ class Script:
return execlist
@staticmethod
def generate_augmented_validate_list(task, workflow_id):
def generate_augmented_validate_list(task: Task, workflow_id: int) -> dict[str, Callable]:
"""This makes a dictionary of lambda functions that are closed over the class instance that they represent.
This is passed into PythonScriptParser as a list of helper functions that are
@ -86,7 +94,7 @@ class Script:
updating the task data.
"""
def make_closure_validate(subclass, task, workflow_id):
def make_closure_validate(subclass: Type[Script], task: Task, workflow_id: int) -> Callable:
"""Make_closure_validate."""
instance = subclass()
return lambda *a, **b: subclass.do_task_validate_only(
@ -103,7 +111,7 @@ class Script:
return execlist
@classmethod
def get_all_subclasses(cls):
def get_all_subclasses(cls) -> list[Type[Script]]:
"""Get_all_subclasses."""
# This is expensive to generate, never changes after we load up.
global SCRIPT_SUB_CLASSES
@ -112,7 +120,7 @@ class Script:
return SCRIPT_SUB_CLASSES
@staticmethod
def _get_all_subclasses(cls):
def _get_all_subclasses(script_class: Any) -> list[Type[Script]]:
"""_get_all_subclasses."""
# hackish mess to make sure we have all the modules loaded for the scripts
pkg_dir = os.path.dirname(__file__)
@ -122,13 +130,13 @@ class Script:
"""Returns a list of all classes that extend this class."""
all_subclasses = []
for subclass in cls.__subclasses__():
for subclass in script_class.__subclasses__():
all_subclasses.append(subclass)
all_subclasses.extend(Script._get_all_subclasses(subclass))
return all_subclasses
def add_data_to_task(self, task, data):
def add_data_to_task(self, task: Task, data: Any) -> None:
"""Add_data_to_task."""
key = self.__class__.__name__
if key in task.data:
@ -140,12 +148,12 @@ class Script:
class ScriptValidationError:
"""ScriptValidationError."""
def __init__(self, code, message):
def __init__(self, code: str, message: str):
"""__init__."""
self.code = code
self.message = message
@classmethod
def from_api_error(cls, api_error: ApiError):
def from_api_error(cls, api_error: ApiError) -> ScriptValidationError:
"""From_api_error."""
return cls(api_error.code, api_error.message)

View File

@ -103,7 +103,7 @@ class FileSystemService:
def assert_valid_file_name(file_name: str) -> None:
"""Assert_valid_file_name."""
file_extension = FileSystemService.get_extension(file_name)
if file_extension not in FileType._member_names_:
if file_extension not in FileType.list():
raise ApiError(
"unknown_extension",
"The file you provided does not have an accepted extension:"

View File

@ -74,7 +74,7 @@ class ProcessModelService(FileSystemService):
path = self.workflow_path(process_model)
shutil.rmtree(path)
def __remove_library_references(self, spec_id):
def __remove_library_references(self, spec_id: str) -> None:
"""__remove_library_references."""
for process_model in self.get_process_models():
if spec_id in process_model.libraries:
@ -82,7 +82,7 @@ class ProcessModelService(FileSystemService):
self.update_spec(process_model)
@property
def master_spec(self):
def master_spec(self) -> Optional[ProcessModelInfo]:
"""Master_spec."""
return self.get_master_spec()
@ -309,7 +309,7 @@ class ProcessModelService(FileSystemService):
# workflow_metas.append(WorkflowMetadata.from_workflow(workflow))
return workflow_metas
def __scan_spec(self, path, name, process_group=None):
def __scan_spec(self, path: str, name: str, process_group: Optional[ProcessGroup] = None) -> ProcessModelInfo:
"""__scan_spec."""
spec_path = os.path.join(path, self.WF_JSON_FILE)
is_master = FileSystemService.MASTER_SPECIFICATION in spec_path
@ -318,6 +318,11 @@ class ProcessModelService(FileSystemService):
with open(spec_path) as wf_json:
data = json.load(wf_json)
spec = self.WF_SCHEMA.load(data)
if spec is None:
raise ApiError(
code="process_model_could_not_be_loaded_from_disk",
message=f"We could not load the process_model from disk with data: {data}",
)
else:
spec = ProcessModelInfo(
id=name,

View File

@ -67,7 +67,7 @@ class SpecFileService(FileSystemService):
file = SpecFileService.to_file_object(file_name, file_path)
if file_name == workflow_spec.primary_file_name:
SpecFileService.set_primary_bpmn(workflow_spec, file_name, binary_data)
elif workflow_spec.primary_file_name is None and file.type == FileType.bpmn:
elif workflow_spec.primary_file_name is None and file.type == str(FileType.bpmn):
# If no primary process exists, make this pirmary process.
SpecFileService.set_primary_bpmn(workflow_spec, file_name, binary_data)
return file

View File

@ -1,4 +1,5 @@
"""User_service."""
from typing import Any, Optional
from flask import g
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
@ -27,7 +28,7 @@ class UserService:
def admin_is_impersonating() -> bool:
"""Admin_is_impersonating."""
if UserService.user_is_admin():
admin_session: AdminSessionModel = UserService.get_admin_session()
admin_session = UserService.get_admin_session()
return admin_session is not None
else:
@ -39,12 +40,12 @@ class UserService:
# Returns true if the given user uid is different from the current user's uid.
@staticmethod
def is_different_user(uid):
def is_different_user(uid: str) -> bool:
"""Is_different_user."""
return UserService.has_user() and uid is not None and uid is not g.user.uid
@staticmethod
def current_user(allow_admin_impersonate: bool = False) -> UserModel:
def current_user(allow_admin_impersonate: bool = False) -> Any:
"""Current_user."""
if not UserService.has_user():
raise ApiError(
@ -66,7 +67,7 @@ class UserService:
# This method allows an admin user to start impersonating another user with the given uid.
# Stops impersonating if the uid is None or invalid.
@staticmethod
def start_impersonating(uid=None):
def start_impersonating(uid: Optional[str] = None) -> None:
"""Start_impersonating."""
if not UserService.has_user():
raise ApiError(
@ -104,7 +105,7 @@ class UserService:
raise ApiError("invalid_uid", "The uid provided is not valid.")
@staticmethod
def stop_impersonating():
def stop_impersonating() -> None:
"""Stop_impersonating."""
if not UserService.has_user():
raise ApiError(
@ -115,13 +116,13 @@ class UserService:
if "impersonate_user" in g:
del g.impersonate_user
admin_session: AdminSessionModel = UserService.get_admin_session()
admin_session = UserService.get_admin_session()
if admin_session:
db.session.delete(admin_session)
db.session.commit()
@staticmethod
def in_list(uids, allow_admin_impersonate=False):
def in_list(uids: list[str], allow_admin_impersonate: bool = False) -> bool:
"""Returns true if the current user's id is in the given list of ids.
False if there is no user, or the user is not in the list.
@ -135,7 +136,7 @@ class UserService:
return False
@staticmethod
def get_admin_session() -> AdminSessionModel:
def get_admin_session() -> Any:
"""Get_admin_session."""
if UserService.user_is_admin():
return (
@ -151,7 +152,7 @@ class UserService:
)
@staticmethod
def get_admin_session_user() -> UserModel:
def get_admin_session_user() -> Any:
"""Get_admin_session_user."""
if UserService.user_is_admin():
admin_session = UserService.get_admin_session()