hammer run_monkeytype
This commit is contained in:
parent
d87bcc3a2d
commit
4f1c513abb
|
@ -5,6 +5,7 @@ from typing import List
|
|||
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from lxml import etree # type: ignore
|
||||
from lxml.etree import Element as EtreeElement # type: ignore
|
||||
from SpiffWorkflow.bpmn.parser.ValidationException import ValidationException # type: ignore
|
||||
|
||||
from spiffworkflow_backend.models.file import File
|
||||
|
@ -134,7 +135,7 @@ class SpecFileService(FileSystemService):
|
|||
if not binary_data:
|
||||
binary_data = SpecFileService.get_data(workflow_spec, file_name)
|
||||
try:
|
||||
bpmn: etree.Element = etree.fromstring(binary_data)
|
||||
bpmn: EtreeElement = etree.fromstring(binary_data)
|
||||
workflow_spec.primary_process_id = SpecFileService.get_process_id(bpmn)
|
||||
workflow_spec.primary_file_name = file_name
|
||||
workflow_spec.is_review = SpecFileService.has_swimlane(bpmn)
|
||||
|
@ -164,7 +165,7 @@ class SpecFileService(FileSystemService):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def has_swimlane(et_root: etree.Element):
|
||||
def has_swimlane(et_root: EtreeElement):
|
||||
"""Look through XML and determine if there are any lanes present that have a label."""
|
||||
elements = et_root.xpath(
|
||||
"//bpmn:lane",
|
||||
|
@ -177,7 +178,7 @@ class SpecFileService(FileSystemService):
|
|||
return retval
|
||||
|
||||
@staticmethod
|
||||
def get_process_id(et_root: etree.Element):
|
||||
def get_process_id(et_root: EtreeElement):
|
||||
"""Get_process_id."""
|
||||
process_elements = []
|
||||
for child in et_root:
|
||||
|
@ -194,7 +195,7 @@ class SpecFileService(FileSystemService):
|
|||
|
||||
# Look for the element that has the startEvent in it
|
||||
for e in process_elements:
|
||||
this_element: etree.Element = e
|
||||
this_element: EtreeElement = e
|
||||
for child_element in list(this_element):
|
||||
if child_element.tag.endswith("startEvent"):
|
||||
return this_element.attrib["id"]
|
||||
|
|
|
@ -12,19 +12,19 @@ class UserService:
|
|||
|
||||
# Returns true if the current user is logged in.
|
||||
@staticmethod
|
||||
def has_user():
|
||||
def has_user() -> bool:
|
||||
"""Has_user."""
|
||||
return "token" in g and bool(g.token) and "user" in g and bool(g.user)
|
||||
|
||||
# Returns true if the current user is an admin.
|
||||
@staticmethod
|
||||
def user_is_admin():
|
||||
def user_is_admin() -> bool:
|
||||
"""User_is_admin."""
|
||||
return UserService.has_user() and g.user.is_admin()
|
||||
|
||||
# Returns true if the current admin user is impersonating another user.
|
||||
@staticmethod
|
||||
def admin_is_impersonating():
|
||||
def admin_is_impersonating() -> bool:
|
||||
"""Admin_is_impersonating."""
|
||||
if UserService.user_is_admin():
|
||||
admin_session: AdminSessionModel = UserService.get_admin_session()
|
||||
|
@ -44,7 +44,7 @@ class UserService:
|
|||
return UserService.has_user() and uid is not None and uid is not g.user.uid
|
||||
|
||||
@staticmethod
|
||||
def current_user(allow_admin_impersonate=False) -> UserModel:
|
||||
def current_user(allow_admin_impersonate: bool=False) -> UserModel:
|
||||
"""Current_user."""
|
||||
if not UserService.has_user():
|
||||
raise ApiError(
|
||||
|
|
|
@ -14,17 +14,17 @@ class ExampleDataLoader:
|
|||
|
||||
def create_spec(
|
||||
self,
|
||||
id,
|
||||
display_name="",
|
||||
description="",
|
||||
filepath=None,
|
||||
master_spec=False,
|
||||
process_group_id="",
|
||||
display_order=0,
|
||||
from_tests=False,
|
||||
standalone=False,
|
||||
library=False,
|
||||
):
|
||||
id: str,
|
||||
display_name: str="",
|
||||
description: str="",
|
||||
filepath: None=None,
|
||||
master_spec: bool=False,
|
||||
process_group_id: str="",
|
||||
display_order: int=0,
|
||||
from_tests: bool=False,
|
||||
standalone: bool=False,
|
||||
library: bool=False,
|
||||
) -> ProcessModelInfo:
|
||||
"""Assumes that a directory exists in static/bpmn with the same name as the given id.
|
||||
|
||||
further assumes that the [id].bpmn is the primary file for the process model.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
"""User."""
|
||||
from typing import Any
|
||||
from typing import Dict, Optional, Any
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
from tests.spiffworkflow_backend.helpers.example_data import ExampleDataLoader
|
||||
|
@ -7,6 +7,8 @@ from tests.spiffworkflow_backend.helpers.example_data import ExampleDataLoader
|
|||
from spiffworkflow_backend.models.process_group import ProcessGroup
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from flask.app import Flask
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
|
||||
|
||||
def find_or_create_user(username: str = "test_user1") -> Any:
|
||||
|
@ -20,7 +22,7 @@ def find_or_create_user(username: str = "test_user1") -> Any:
|
|||
return user
|
||||
|
||||
|
||||
def assure_process_group_exists(process_group_id=None):
|
||||
def assure_process_group_exists(process_group_id: Optional[str]=None) -> ProcessGroup:
|
||||
"""Assure_process_group_exists."""
|
||||
process_group = None
|
||||
workflow_spec_service = ProcessModelService()
|
||||
|
@ -41,13 +43,13 @@ def assure_process_group_exists(process_group_id=None):
|
|||
|
||||
|
||||
def load_test_spec(
|
||||
app,
|
||||
dir_name,
|
||||
display_name=None,
|
||||
master_spec=False,
|
||||
process_group_id=None,
|
||||
library=False,
|
||||
):
|
||||
app: Flask,
|
||||
dir_name: str,
|
||||
display_name: None=None,
|
||||
master_spec: bool=False,
|
||||
process_group_id: Optional[str]=None,
|
||||
library: bool=False,
|
||||
) -> ProcessModelInfo:
|
||||
"""Loads a spec into the database based on a directory in /tests/data."""
|
||||
process_group = None
|
||||
workflow_spec_service = ProcessModelService()
|
||||
|
@ -84,7 +86,7 @@ def load_test_spec(
|
|||
# return '?%s' % '&'.join(query_string_list)
|
||||
|
||||
|
||||
def logged_in_headers(user=None, redirect_url="http://some/frontend/url"):
|
||||
def logged_in_headers(user: Optional[UserModel]=None, redirect_url: str="http://some/frontend/url") -> Dict[str, str]:
|
||||
"""Logged_in_headers."""
|
||||
# if user is None:
|
||||
# uid = 'test_user'
|
||||
|
|
|
@ -18,10 +18,12 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
|||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from flask.app import Flask
|
||||
from typing import Dict, Iterator, Optional, Union
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def with_bpmn_file_cleanup():
|
||||
def with_bpmn_file_cleanup() -> Iterator[None]:
|
||||
"""Process_group_resource."""
|
||||
try:
|
||||
yield
|
||||
|
@ -32,13 +34,13 @@ def with_bpmn_file_cleanup():
|
|||
|
||||
|
||||
# phase 1: req_id: 7.1 Deploy process
|
||||
def test_process_model_add(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_model_add(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_add_new_process_model."""
|
||||
create_process_model(app, client)
|
||||
create_spec_file(app, client)
|
||||
|
||||
|
||||
def test_process_model_delete(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_model_delete(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_process_model_delete."""
|
||||
create_process_model(app, client)
|
||||
|
||||
|
@ -61,7 +63,7 @@ def test_process_model_delete(app, client: FlaskClient, with_bpmn_file_cleanup):
|
|||
assert process_model is None
|
||||
|
||||
|
||||
def test_process_model_update(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_model_update(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_process_model_update."""
|
||||
create_process_model(app, client)
|
||||
process_model = ProcessModelService().get_spec("make_cookies")
|
||||
|
@ -81,7 +83,7 @@ def test_process_model_update(app, client: FlaskClient, with_bpmn_file_cleanup):
|
|||
assert response.json["display_name"] == "Updated Display Name"
|
||||
|
||||
|
||||
def test_process_group_add(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_group_add(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_add_process_group."""
|
||||
process_group = ProcessGroup(
|
||||
id="test", display_name="Another Test Category", display_order=0, admin=False
|
||||
|
@ -106,7 +108,7 @@ def test_process_group_add(app, client: FlaskClient, with_bpmn_file_cleanup):
|
|||
assert persisted.id == "test"
|
||||
|
||||
|
||||
def test_process_group_delete(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_group_delete(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_process_group_delete."""
|
||||
process_group_id = "test"
|
||||
process_group_display_name = "My Process Group"
|
||||
|
@ -129,7 +131,7 @@ def test_process_group_delete(app, client: FlaskClient, with_bpmn_file_cleanup):
|
|||
print(f"test_process_group_delete: {__name__}")
|
||||
|
||||
|
||||
def test_process_group_update(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_group_update(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test Process Group Update."""
|
||||
group_id = "test_process_group"
|
||||
group_display_name = "Test Group"
|
||||
|
@ -157,8 +159,8 @@ def test_process_group_update(app, client: FlaskClient, with_bpmn_file_cleanup):
|
|||
|
||||
|
||||
def test_process_model_file_update_fails_if_no_file_given(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_process_model_file_update."""
|
||||
create_spec_file(app, client)
|
||||
|
||||
|
@ -178,8 +180,8 @@ def test_process_model_file_update_fails_if_no_file_given(
|
|||
|
||||
|
||||
def test_process_model_file_update_fails_if_contents_is_empty(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_process_model_file_update."""
|
||||
create_spec_file(app, client)
|
||||
|
||||
|
@ -198,7 +200,7 @@ def test_process_model_file_update_fails_if_contents_is_empty(
|
|||
assert response.json["code"] == "file_contents_empty"
|
||||
|
||||
|
||||
def test_process_model_file_update(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_model_file_update(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_process_model_file_update."""
|
||||
original_file = create_spec_file(app, client)
|
||||
|
||||
|
@ -227,7 +229,7 @@ def test_process_model_file_update(app, client: FlaskClient, with_bpmn_file_clea
|
|||
assert updated_file["file_contents"] == new_file_contents.decode()
|
||||
|
||||
|
||||
def test_get_file(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_get_file(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_get_file."""
|
||||
user = find_or_create_user()
|
||||
test_process_group_id = "group_id1"
|
||||
|
@ -244,8 +246,8 @@ def test_get_file(app, client: FlaskClient, with_bpmn_file_cleanup):
|
|||
|
||||
|
||||
def test_get_workflow_from_workflow_spec(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_get_workflow_from_workflow_spec."""
|
||||
user = find_or_create_user()
|
||||
spec = load_test_spec(app, "hello_world")
|
||||
|
@ -258,7 +260,7 @@ def test_get_workflow_from_workflow_spec(
|
|||
# assert('Task_GetName' == response.json['next_task']['name'])
|
||||
|
||||
|
||||
def test_get_process_groups_when_none(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_get_process_groups_when_none(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_get_process_groups_when_none."""
|
||||
user = find_or_create_user()
|
||||
response = client.get("/v1.0/process-groups", headers=logged_in_headers(user))
|
||||
|
@ -267,8 +269,8 @@ def test_get_process_groups_when_none(app, client: FlaskClient, with_bpmn_file_c
|
|||
|
||||
|
||||
def test_get_process_groups_when_there_are_some(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_get_process_groups_when_there_are_some."""
|
||||
user = find_or_create_user()
|
||||
load_test_spec(app, "hello_world")
|
||||
|
@ -277,7 +279,7 @@ def test_get_process_groups_when_there_are_some(
|
|||
assert len(response.json) == 1
|
||||
|
||||
|
||||
def test_get_process_group_when_found(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_get_process_group_when_found(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_get_process_group_when_found."""
|
||||
user = find_or_create_user()
|
||||
test_process_group_id = "group_id1"
|
||||
|
@ -291,7 +293,7 @@ def test_get_process_group_when_found(app, client: FlaskClient, with_bpmn_file_c
|
|||
assert response.json["process_models"][0]["id"] == process_model_dir_name
|
||||
|
||||
|
||||
def test_get_process_model_when_found(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_get_process_model_when_found(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_get_process_model_when_found."""
|
||||
user = find_or_create_user()
|
||||
test_process_group_id = "group_id1"
|
||||
|
@ -308,8 +310,8 @@ def test_get_process_model_when_found(app, client: FlaskClient, with_bpmn_file_c
|
|||
|
||||
|
||||
def test_get_process_model_when_not_found(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_get_process_model_when_not_found."""
|
||||
user = find_or_create_user()
|
||||
process_model_dir_name = "THIS_NO_EXISTS"
|
||||
|
@ -322,7 +324,7 @@ def test_get_process_model_when_not_found(
|
|||
assert response.json["code"] == "process_mode_cannot_be_found"
|
||||
|
||||
|
||||
def test_process_instance_create(app, client: FlaskClient, with_bpmn_file_cleanup):
|
||||
def test_process_instance_create(app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_process_instance_create."""
|
||||
test_process_group_id = "runs_without_input"
|
||||
process_model_dir_name = "sample"
|
||||
|
@ -334,8 +336,8 @@ def test_process_instance_create(app, client: FlaskClient, with_bpmn_file_cleanu
|
|||
|
||||
|
||||
def test_process_instance_list_with_default_list(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_process_instance_list_with_default_list."""
|
||||
db.session.query(ProcessInstanceModel).delete()
|
||||
db.session.commit()
|
||||
|
@ -374,8 +376,8 @@ def test_process_instance_list_with_default_list(
|
|||
|
||||
|
||||
def test_process_instance_list_with_paginated_items(
|
||||
app, client: FlaskClient, with_bpmn_file_cleanup
|
||||
):
|
||||
app: Flask, client: FlaskClient, with_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_process_instance_list_with_paginated_items."""
|
||||
db.session.query(ProcessInstanceModel).delete()
|
||||
db.session.commit()
|
||||
|
@ -422,8 +424,8 @@ def test_process_instance_list_with_paginated_items(
|
|||
|
||||
|
||||
def create_process_instance(
|
||||
app, client: FlaskClient, test_process_group_id, process_model_dir_name, headers
|
||||
):
|
||||
app: Flask, client: FlaskClient, test_process_group_id: str, process_model_dir_name: str, headers: Dict[str, str]
|
||||
) -> None:
|
||||
"""Create_process_instance."""
|
||||
load_test_spec(app, process_model_dir_name, process_group_id=test_process_group_id)
|
||||
response = client.post(
|
||||
|
@ -438,7 +440,7 @@ def create_process_instance(
|
|||
assert response.json["data"]["person"] == "Kevin"
|
||||
|
||||
|
||||
def create_process_model(app, client: FlaskClient):
|
||||
def create_process_model(app: Flask, client: FlaskClient) -> None:
|
||||
"""Create_process_model."""
|
||||
process_model_service = ProcessModelService()
|
||||
assert 0 == len(process_model_service.get_specs())
|
||||
|
@ -475,7 +477,7 @@ def create_process_model(app, client: FlaskClient):
|
|||
assert 1 == len(process_model_service.get_process_groups())
|
||||
|
||||
|
||||
def create_spec_file(app, client: FlaskClient):
|
||||
def create_spec_file(app: Flask, client: FlaskClient) -> Dict[str, Optional[Union[str, bool, int]]]:
|
||||
"""Test_create_spec_file."""
|
||||
spec = load_test_spec(app, "random_fact")
|
||||
data = {"file": (io.BytesIO(b"abcdef"), "random_fact.svg")}
|
||||
|
|
Loading…
Reference in New Issue