lint and mypy

This commit is contained in:
burnettk 2022-11-12 22:07:30 -05:00
parent 646435d200
commit fd2470965d
2 changed files with 55 additions and 36 deletions

View File

@ -13,7 +13,6 @@ from typing import Union
import connexion # type: ignore import connexion # type: ignore
import flask.wrappers import flask.wrappers
import jinja2 import jinja2
from spiffworkflow_backend.models.group import GroupModel
import werkzeug import werkzeug
from flask import Blueprint from flask import Blueprint
from flask import current_app from flask import current_app
@ -29,7 +28,9 @@ from lxml import etree # type: ignore
from lxml.builder import ElementMaker # type: ignore from lxml.builder import ElementMaker # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
from sqlalchemy import and_, asc, desc from sqlalchemy import and_
from sqlalchemy import asc
from sqlalchemy import desc
from spiffworkflow_backend.exceptions.process_entity_not_found_error import ( from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
ProcessEntityNotFoundError, ProcessEntityNotFoundError,
@ -37,6 +38,7 @@ from spiffworkflow_backend.exceptions.process_entity_not_found_error import (
from spiffworkflow_backend.models.active_task import ActiveTaskModel from spiffworkflow_backend.models.active_task import ActiveTaskModel
from spiffworkflow_backend.models.active_task_user import ActiveTaskUserModel from spiffworkflow_backend.models.active_task_user import ActiveTaskUserModel
from spiffworkflow_backend.models.file import FileSchema from spiffworkflow_backend.models.file import FileSchema
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_model import MessageModel from spiffworkflow_backend.models.message_model import MessageModel
from spiffworkflow_backend.models.message_triggerable_process_model import ( from spiffworkflow_backend.models.message_triggerable_process_model import (
@ -1000,47 +1002,55 @@ def task_list_my_tasks(page: int = 1, per_page: int = 100) -> flask.wrappers.Res
return make_response(jsonify(response_json), 200) return make_response(jsonify(response_json), 200)
def task_list_for_my_open_processes(page: int = 1, per_page: int = 100) -> flask.wrappers.Response: def task_list_for_my_open_processes(
page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Task_list_for_my_open_processes."""
return get_tasks(page=page, per_page=per_page) return get_tasks(page=page, per_page=per_page)
def task_list_for_processes_started_by_others(page: int = 1, per_page: int = 100) -> flask.wrappers.Response: def task_list_for_processes_started_by_others(
page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Task_list_for_processes_started_by_others."""
return get_tasks(processes_started_by_user=False, page=page, per_page=per_page) return get_tasks(processes_started_by_user=False, page=page, per_page=per_page)
def get_tasks(processes_started_by_user: bool = True, page: int = 1, per_page: int = 100) -> flask.wrappers.Response: def get_tasks(
processes_started_by_user: bool = True, page: int = 1, per_page: int = 100
) -> flask.wrappers.Response:
"""Get_tasks."""
user_id = g.user.id user_id = g.user.id
active_tasks_query = ( active_tasks_query = (
ActiveTaskModel.query ActiveTaskModel.query.outerjoin(
.outerjoin(GroupModel, GroupModel.id == ActiveTaskModel.lane_assignment_id) GroupModel, GroupModel.id == ActiveTaskModel.lane_assignment_id
)
.join(ProcessInstanceModel) .join(ProcessInstanceModel)
.join(UserModel, UserModel.id == ProcessInstanceModel.process_initiator_id) .join(UserModel, UserModel.id == ProcessInstanceModel.process_initiator_id)
) )
if processes_started_by_user: if processes_started_by_user:
active_tasks_query = (active_tasks_query.filter(ProcessInstanceModel.process_initiator_id==user_id) active_tasks_query = active_tasks_query.filter(
.outerjoin(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id)) ProcessInstanceModel.process_initiator_id == user_id
) ).outerjoin(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id))
else: else:
active_tasks_query = (active_tasks_query.filter(ProcessInstanceModel.process_initiator_id != user_id) active_tasks_query = active_tasks_query.filter(
.join(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id)) ProcessInstanceModel.process_initiator_id != user_id
) ).join(ActiveTaskUserModel, and_(ActiveTaskUserModel.user_id == user_id))
active_tasks = ( active_tasks = active_tasks_query.add_columns(
active_tasks_query.add_columns( ProcessInstanceModel.process_model_identifier,
ProcessInstanceModel.process_model_identifier, ProcessInstanceModel.status.label("process_instance_status"), # type: ignore
ProcessInstanceModel.status.label("process_instance_status"), ProcessInstanceModel.updated_at_in_seconds,
ProcessInstanceModel.updated_at_in_seconds, ProcessInstanceModel.created_at_in_seconds,
ProcessInstanceModel.created_at_in_seconds, UserModel.username,
UserModel.username, GroupModel.identifier.label("group_identifier"),
GroupModel.identifier.label("group_identifier"), ActiveTaskModel.task_name,
ActiveTaskModel.task_name, ActiveTaskModel.task_title,
ActiveTaskModel.task_title, ActiveTaskModel.process_model_display_name,
ActiveTaskModel.process_model_display_name, ActiveTaskModel.process_instance_id,
ActiveTaskModel.process_instance_id, ActiveTaskUserModel.user_id.label("current_user_is_potential_owner"),
ActiveTaskUserModel.user_id.label("current_user_is_potential_owner") ).paginate(page=page, per_page=per_page, error_out=False)
).paginate(page=page, per_page=per_page, error_out=False)
)
response_json = { response_json = {
"results": active_tasks.items, "results": active_tasks.items,
@ -1404,7 +1414,7 @@ def find_principal_or_raise() -> PrincipalModel:
def find_process_instance_by_id_or_raise( def find_process_instance_by_id_or_raise(
process_instance_id: int process_instance_id: int,
) -> ProcessInstanceModel: ) -> ProcessInstanceModel:
"""Find_process_instance_by_id_or_raise.""" """Find_process_instance_by_id_or_raise."""
process_instance_query = ProcessInstanceModel.query.filter_by( process_instance_query = ProcessInstanceModel.query.filter_by(

View File

@ -1441,7 +1441,7 @@ class TestProcessApi(BaseTest):
updated_at_in_seconds=round(time.time()), updated_at_in_seconds=round(time.time()),
start_in_seconds=(1000 * i) + 1000, start_in_seconds=(1000 * i) + 1000,
end_in_seconds=(1000 * i) + 2000, end_in_seconds=(1000 * i) + 2000,
bpmn_json=json.dumps({"i": i}), bpmn_version_control_identifier=i,
) )
db.session.add(process_instance) db.session.add(process_instance)
db.session.commit() db.session.commit()
@ -1487,7 +1487,12 @@ class TestProcessApi(BaseTest):
results = response.json["results"] results = response.json["results"]
assert len(results) == 4 assert len(results) == 4
for i in range(4): for i in range(4):
assert json.loads(results[i]["bpmn_json"])["i"] in (1, 2, 3, 4) assert json.loads(results[i]["bpmn_version_control_identifier"]) in (
1,
2,
3,
4,
)
# start > 2000, end < 5000 - this should eliminate the first 2 and the last # start > 2000, end < 5000 - this should eliminate the first 2 and the last
response = client.get( response = client.get(
@ -1497,8 +1502,8 @@ class TestProcessApi(BaseTest):
assert response.json is not None assert response.json is not None
results = response.json["results"] results = response.json["results"]
assert len(results) == 2 assert len(results) == 2
assert json.loads(results[0]["bpmn_json"])["i"] in (2, 3) assert json.loads(results[0]["bpmn_version_control_identifier"]) in (2, 3)
assert json.loads(results[1]["bpmn_json"])["i"] in (2, 3) assert json.loads(results[1]["bpmn_version_control_identifier"]) in (2, 3)
# start > 1000, start < 4000 - this should eliminate the first and the last 2 # start > 1000, start < 4000 - this should eliminate the first and the last 2
response = client.get( response = client.get(
@ -1508,8 +1513,8 @@ class TestProcessApi(BaseTest):
assert response.json is not None assert response.json is not None
results = response.json["results"] results = response.json["results"]
assert len(results) == 2 assert len(results) == 2
assert json.loads(results[0]["bpmn_json"])["i"] in (1, 2) assert json.loads(results[0]["bpmn_version_control_identifier"]) in (1, 2)
assert json.loads(results[1]["bpmn_json"])["i"] in (1, 2) assert json.loads(results[1]["bpmn_version_control_identifier"]) in (1, 2)
# end > 2000, end < 6000 - this should eliminate the first and the last # end > 2000, end < 6000 - this should eliminate the first and the last
response = client.get( response = client.get(
@ -1520,7 +1525,11 @@ class TestProcessApi(BaseTest):
results = response.json["results"] results = response.json["results"]
assert len(results) == 3 assert len(results) == 3
for i in range(3): for i in range(3):
assert json.loads(results[i]["bpmn_json"])["i"] in (1, 2, 3) assert json.loads(results[i]["bpmn_version_control_identifier"]) in (
1,
2,
3,
)
def test_process_instance_report_list( def test_process_instance_report_list(
self, self,