lint and mypy

This commit is contained in:
burnettk 2022-11-12 22:07:30 -05:00
parent 4a36728ae6
commit 0ad11dcd2e
2 changed files with 55 additions and 36 deletions

View File

@ -13,7 +13,6 @@ from typing import Union
import connexion # type: ignore
import flask.wrappers
import jinja2
from spiffworkflow_backend.models.group import GroupModel
import werkzeug
from flask import Blueprint
from flask import current_app
@ -29,7 +28,9 @@ from lxml import etree # type: ignore
from lxml.builder import ElementMaker # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState
from sqlalchemy import and_, asc, desc
from sqlalchemy import and_
from sqlalchemy import asc
from sqlalchemy import desc
from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
ProcessEntityNotFoundError,
@ -37,6 +38,7 @@ from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
from spiffworkflow_backend.models.active_task import ActiveTaskModel
from spiffworkflow_backend.models.active_task_user import ActiveTaskUserModel
from spiffworkflow_backend.models.file import FileSchema
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.message_triggerable_process_model import (
@ -1000,47 +1002,55 @@ def task_list_my_tasks(page: int = 1, per_page: int = 100) -> flask.wrappers.Res
return make_response(jsonify(response_json), 200)
def task_list_for_my_open_processes(page: int = 1, per_page: int = 100) -> flask.wrappers.Response:
def task_list_for_my_open_processes(
page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Task_list_for_my_open_processes."""
return get_tasks(page=page, per_page=per_page)
def task_list_for_processes_started_by_others(page: int = 1, per_page: int = 100) -> flask.wrappers.Response:
def task_list_for_processes_started_by_others(
page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Task_list_for_processes_started_by_others."""
return get_tasks(processes_started_by_user=False, page=page, per_page=per_page)
def get_tasks(processes_started_by_user: bool = True, page: int = 1, per_page: int = 100) -> flask.wrappers.Response:
def get_tasks(
processes_started_by_user: bool = True, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Get_tasks."""
user_id = g.user.id
active_tasks_query = (
ActiveTaskModel.query
.outerjoin(GroupModel, GroupModel.id == ActiveTaskModel.lane_assignment_id)
ActiveTaskModel.query.outerjoin(
GroupModel, GroupModel.id == ActiveTaskModel.lane_assignment_id
)
.join(ProcessInstanceModel)
.join(UserModel, UserModel.id == ProcessInstanceModel.process_initiator_id)
)
if processes_started_by_user:
active_tasks_query = (active_tasks_query.filter(ProcessInstanceModel.process_initiator_id==user_id)
.outerjoin(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id))
)
active_tasks_query = active_tasks_query.filter(
ProcessInstanceModel.process_initiator_id == user_id
).outerjoin(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id))
else:
active_tasks_query = (active_tasks_query.filter(ProcessInstanceModel.process_initiator_id != user_id)
.join(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id))
)
active_tasks_query = active_tasks_query.filter(
ProcessInstanceModel.process_initiator_id != user_id
).join(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id))
active_tasks = (
active_tasks_query.add_columns(
ProcessInstanceModel.process_model_identifier,
ProcessInstanceModel.status.label("process_instance_status"),
ProcessInstanceModel.updated_at_in_seconds,
ProcessInstanceModel.created_at_in_seconds,
UserModel.username,
GroupModel.identifier.label("group_identifier"),
ActiveTaskModel.task_name,
ActiveTaskModel.task_title,
ActiveTaskModel.process_model_display_name,
ActiveTaskModel.process_instance_id,
ActiveTaskUserModel.user_id.label("current_user_is_potential_owner")
).paginate(page=page, per_page=per_page, error_out=False)
)
active_tasks = active_tasks_query.add_columns(
ProcessInstanceModel.process_model_identifier,
ProcessInstanceModel.status.label("process_instance_status"), # type: ignore
ProcessInstanceModel.updated_at_in_seconds,
ProcessInstanceModel.created_at_in_seconds,
UserModel.username,
GroupModel.identifier.label("group_identifier"),
ActiveTaskModel.task_name,
ActiveTaskModel.task_title,
ActiveTaskModel.process_model_display_name,
ActiveTaskModel.process_instance_id,
ActiveTaskUserModel.user_id.label("current_user_is_potential_owner"),
).paginate(page=page, per_page=per_page, error_out=False)
response_json = {
"results": active_tasks.items,
@ -1404,7 +1414,7 @@ def find_principal_or_raise() -> PrincipalModel:
def find_process_instance_by_id_or_raise(
process_instance_id: int
process_instance_id: int,
) -> ProcessInstanceModel:
"""Find_process_instance_by_id_or_raise."""
process_instance_query = ProcessInstanceModel.query.filter_by(

View File

@ -1441,7 +1441,7 @@ class TestProcessApi(BaseTest):
updated_at_in_seconds=round(time.time()),
start_in_seconds=(1000 * i) + 1000,
end_in_seconds=(1000 * i) + 2000,
bpmn_json=json.dumps({"i": i}),
bpmn_version_control_identifier=i,
)
db.session.add(process_instance)
db.session.commit()
@ -1487,7 +1487,12 @@ class TestProcessApi(BaseTest):
results = response.json["results"]
assert len(results) == 4
for i in range(4):
assert json.loads(results[i]["bpmn_json"])["i"] in (1, 2, 3, 4)
assert json.loads(results[i]["bpmn_version_control_identifier"]) in (
1,
2,
3,
4,
)
# start > 2000, end < 5000 - this should eliminate the first 2 and the last
response = client.get(
@ -1497,8 +1502,8 @@ class TestProcessApi(BaseTest):
assert response.json is not None
results = response.json["results"]
assert len(results) == 2
assert json.loads(results[0]["bpmn_json"])["i"] in (2, 3)
assert json.loads(results[1]["bpmn_json"])["i"] in (2, 3)
assert json.loads(results[0]["bpmn_version_control_identifier"]) in (2, 3)
assert json.loads(results[1]["bpmn_version_control_identifier"]) in (2, 3)
# start > 1000, start < 4000 - this should eliminate the first and the last 2
response = client.get(
@ -1508,8 +1513,8 @@ class TestProcessApi(BaseTest):
assert response.json is not None
results = response.json["results"]
assert len(results) == 2
assert json.loads(results[0]["bpmn_json"])["i"] in (1, 2)
assert json.loads(results[1]["bpmn_json"])["i"] in (1, 2)
assert json.loads(results[0]["bpmn_version_control_identifier"]) in (1, 2)
assert json.loads(results[1]["bpmn_version_control_identifier"]) in (1, 2)
# end > 2000, end < 6000 - this should eliminate the first and the last
response = client.get(
@ -1520,7 +1525,11 @@ class TestProcessApi(BaseTest):
results = response.json["results"]
assert len(results) == 3
for i in range(3):
assert json.loads(results[i]["bpmn_json"])["i"] in (1, 2, 3)
assert json.loads(results[i]["bpmn_version_control_identifier"]) in (
1,
2,
3,
)
def test_process_instance_report_list(
self,