mypy should be passing now

This commit is contained in:
jasquat 2022-05-18 17:07:35 -04:00
parent 0f6ef58503
commit 2f918f5418
13 changed files with 303 additions and 269 deletions

2
poetry.lock generated
View File

@ -609,7 +609,7 @@ werkzeug = "*"
type = "git"
url = "https://github.com/sartography/flask-bpmn"
reference = "main"
resolved_reference = "9d102ccc9783146d4619faf53ab8aac225654d07"
resolved_reference = "9d0911c9bc1e33160b9decadfa1b67f673406ac2"
[[package]]
name = "flask-cors"

View File

@ -1,6 +1,7 @@
"""__init__."""
import flask.app
from flask import Flask
from flask_bpmn.api.api_error import api_error_blueprint
from flask_bpmn.models.db import db
from flask_bpmn.models.db import migrate
@ -23,5 +24,6 @@ def create_app() -> flask.app.Flask:
app.register_blueprint(user_blueprint)
app.register_blueprint(api_blueprint)
app.register_blueprint(api_error_blueprint)
return app

View File

@ -1,10 +1,9 @@
"""Group."""
from flask_bpmn.models.db import db
from flask_bpmn.models.group import FlaskBpmnGroupModel
from sqlalchemy.orm import relationship
from sqlalchemy.orm import relationship # type: ignore
class GroupModel(FlaskBpmnGroupModel):
class GroupModel(db.Model): # type: ignore
"""GroupModel."""
__tablename__ = "group"

View File

@ -1,9 +1,9 @@
"""Process_model."""
from flask_bpmn.models.db import db
from sqlalchemy.orm import deferred
from sqlalchemy.orm import deferred # type: ignore
class ProcessModel(db.Model):
class ProcessModel(db.Model): # type: ignore
"""ProcessModel."""
__tablename__ = "process_model"

View File

@ -1,9 +1,9 @@
"""User."""
from flask_bpmn.models.db import db
from sqlalchemy.orm import relationship
from sqlalchemy.orm import relationship # type: ignore
class UserModel(db.Model):
class UserModel(db.Model): # type: ignore
"""UserModel."""
__tablename__ = "user"

View File

@ -1,13 +1,13 @@
"""UserGroupAssignment."""
from flask_bpmn.models.db import db
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy import ForeignKey # type: ignore
from sqlalchemy.orm import relationship # type: ignore
from spiff_workflow_webapp.models.group import GroupModel
from spiff_workflow_webapp.models.user import UserModel
class UserGroupAssignmentModel(db.Model):
class UserGroupAssignmentModel(db.Model): # type: ignore
"""UserGroupAssignmentModel."""
__tablename__ = "user_group_assignment"

View File

@ -1,10 +1,11 @@
"""Video."""
from ..extensions import db
class Video(db.Model):
"""Video."""
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50))
url = db.Column(db.String(50))
"""video."""
# """Video."""
# from ..extensions import db
#
#
# class Video(db.Model):
# """Video."""
#
# id = db.Column(db.Integer, primary_key=True)
# name = db.Column(db.String(50))
# url = db.Column(db.String(50))

View File

@ -1,167 +1,168 @@
"""Workflow."""
import enum
import marshmallow
from crc import db
from crc import ma
from marshmallow import post_load
from sqlalchemy import func
from sqlalchemy.orm import deferred
class WorkflowSpecCategory:
"""WorkflowSpecCategory."""
def __init__(self, id, display_name, display_order=0, admin=False):
"""__init__."""
self.id = (
id # A unique string name, lower case, under scores (ie, 'my_category')
)
self.display_name = display_name
self.display_order = display_order
self.admin = admin
self.workflows = [] # For storing Workflow Metadata
self.specs = [] # For the list of specifications associated with a category
self.meta = None # For storing category metadata
def __eq__(self, other):
"""__eq__."""
if not isinstance(other, WorkflowSpecCategory):
return False
if other.id == self.id:
return True
return False
class WorkflowSpecCategorySchema(ma.Schema):
"""WorkflowSpecCategorySchema."""
class Meta:
"""Meta."""
model = WorkflowSpecCategory
fields = ["id", "display_name", "display_order", "admin"]
@post_load
def make_cat(self, data, **kwargs):
"""Make_cat."""
return WorkflowSpecCategory(**data)
class WorkflowSpecInfo:
"""WorkflowSpecInfo."""
def __init__(
self,
id,
display_name,
description,
is_master_spec=False,
standalone=False,
library=False,
primary_file_name="",
primary_process_id="",
libraries=None,
category_id="",
display_order=0,
is_review=False,
):
"""__init__."""
self.id = id # Sting unique id
self.display_name = display_name
self.description = description
self.display_order = display_order
self.is_master_spec = is_master_spec
self.standalone = standalone
self.library = library
self.primary_file_name = primary_file_name
self.primary_process_id = primary_process_id
self.is_review = is_review
self.category_id = category_id
if libraries is None:
libraries = []
self.libraries = libraries
def __eq__(self, other):
"""__eq__."""
if not isinstance(other, WorkflowSpecInfo):
return False
if other.id == self.id:
return True
return False
class WorkflowSpecInfoSchema(ma.Schema):
"""WorkflowSpecInfoSchema."""
class Meta:
"""Meta."""
model = WorkflowSpecInfo
id = marshmallow.fields.String(required=True)
display_name = marshmallow.fields.String(required=True)
description = marshmallow.fields.String()
is_master_spec = marshmallow.fields.Boolean(required=True)
standalone = marshmallow.fields.Boolean(required=True)
library = marshmallow.fields.Boolean(required=True)
display_order = marshmallow.fields.Integer(allow_none=True)
primary_file_name = marshmallow.fields.String(allow_none=True)
primary_process_id = marshmallow.fields.String(allow_none=True)
is_review = marshmallow.fields.Boolean(allow_none=True)
category_id = marshmallow.fields.String(allow_none=True)
libraries = marshmallow.fields.List(marshmallow.fields.String(), allow_none=True)
@post_load
def make_spec(self, data, **kwargs):
"""Make_spec."""
return WorkflowSpecInfo(**data)
class WorkflowState(enum.Enum):
"""WorkflowState."""
hidden = "hidden"
disabled = "disabled"
required = "required"
optional = "optional"
locked = "locked"
@classmethod
def has_value(cls, value):
"""Has_value."""
return value in cls._value2member_map_
@staticmethod
def list():
"""List."""
return list(map(lambda c: c.value, WorkflowState))
class WorkflowStatus(enum.Enum):
"""WorkflowStatus."""
not_started = "not_started"
user_input_required = "user_input_required"
waiting = "waiting"
complete = "complete"
erroring = "erroring"
class WorkflowModel(db.Model):
"""WorkflowModel."""
__tablename__ = "workflow"
id = db.Column(db.Integer, primary_key=True)
bpmn_workflow_json = deferred(db.Column(db.JSON))
status = db.Column(db.Enum(WorkflowStatus))
study_id = db.Column(db.Integer, db.ForeignKey("study.id"))
study = db.relationship("StudyModel", backref="workflow", lazy="select")
workflow_spec_id = db.Column(db.String)
total_tasks = db.Column(db.Integer, default=0)
completed_tasks = db.Column(db.Integer, default=0)
last_updated = db.Column(db.DateTime(timezone=True), server_default=func.now())
user_id = db.Column(db.String, default=None)
state = db.Column(db.String, nullable=True)
state_message = db.Column(db.String, nullable=True)
"""workflow."""
# """Workflow."""
# import enum
#
# import marshmallow
# from crc import db
# from crc import ma
# from marshmallow import post_load
# from sqlalchemy import func
# from sqlalchemy.orm import deferred
#
#
# class WorkflowSpecCategory:
# """WorkflowSpecCategory."""
#
# def __init__(self, id, display_name, display_order=0, admin=False):
# """__init__."""
# self.id = (
# id # A unique string name, lower case, under scores (ie, 'my_category')
# )
# self.display_name = display_name
# self.display_order = display_order
# self.admin = admin
# self.workflows = [] # For storing Workflow Metadata
# self.specs = [] # For the list of specifications associated with a category
# self.meta = None # For storing category metadata
#
# def __eq__(self, other):
# """__eq__."""
# if not isinstance(other, WorkflowSpecCategory):
# return False
# if other.id == self.id:
# return True
# return False
#
#
# class WorkflowSpecCategorySchema(ma.Schema):
# """WorkflowSpecCategorySchema."""
#
# class Meta:
# """Meta."""
#
# model = WorkflowSpecCategory
# fields = ["id", "display_name", "display_order", "admin"]
#
# @post_load
# def make_cat(self, data, **kwargs):
# """Make_cat."""
# return WorkflowSpecCategory(**data)
#
#
# class WorkflowSpecInfo:
# """WorkflowSpecInfo."""
#
# def __init__(
# self,
# id,
# display_name,
# description,
# is_master_spec=False,
# standalone=False,
# library=False,
# primary_file_name="",
# primary_process_id="",
# libraries=None,
# category_id="",
# display_order=0,
# is_review=False,
# ):
# """__init__."""
# self.id = id # Sting unique id
# self.display_name = display_name
# self.description = description
# self.display_order = display_order
# self.is_master_spec = is_master_spec
# self.standalone = standalone
# self.library = library
# self.primary_file_name = primary_file_name
# self.primary_process_id = primary_process_id
# self.is_review = is_review
# self.category_id = category_id
#
# if libraries is None:
# libraries = []
# self.libraries = libraries
#
# def __eq__(self, other):
# """__eq__."""
# if not isinstance(other, WorkflowSpecInfo):
# return False
# if other.id == self.id:
# return True
# return False
#
#
# class WorkflowSpecInfoSchema(ma.Schema):
# """WorkflowSpecInfoSchema."""
#
# class Meta:
# """Meta."""
#
# model = WorkflowSpecInfo
#
# id = marshmallow.fields.String(required=True)
# display_name = marshmallow.fields.String(required=True)
# description = marshmallow.fields.String()
# is_master_spec = marshmallow.fields.Boolean(required=True)
# standalone = marshmallow.fields.Boolean(required=True)
# library = marshmallow.fields.Boolean(required=True)
# display_order = marshmallow.fields.Integer(allow_none=True)
# primary_file_name = marshmallow.fields.String(allow_none=True)
# primary_process_id = marshmallow.fields.String(allow_none=True)
# is_review = marshmallow.fields.Boolean(allow_none=True)
# category_id = marshmallow.fields.String(allow_none=True)
# libraries = marshmallow.fields.List(marshmallow.fields.String(), allow_none=True)
#
# @post_load
# def make_spec(self, data, **kwargs):
# """Make_spec."""
# return WorkflowSpecInfo(**data)
#
#
# class WorkflowState(enum.Enum):
# """WorkflowState."""
#
# hidden = "hidden"
# disabled = "disabled"
# required = "required"
# optional = "optional"
# locked = "locked"
#
# @classmethod
# def has_value(cls, value):
# """Has_value."""
# return value in cls._value2member_map_
#
# @staticmethod
# def list():
# """List."""
# return list(map(lambda c: c.value, WorkflowState))
#
#
# class WorkflowStatus(enum.Enum):
# """WorkflowStatus."""
#
# not_started = "not_started"
# user_input_required = "user_input_required"
# waiting = "waiting"
# complete = "complete"
# erroring = "erroring"
#
#
# class WorkflowModel(db.Model):
# """WorkflowModel."""
#
# __tablename__ = "workflow"
# id = db.Column(db.Integer, primary_key=True)
# bpmn_workflow_json = deferred(db.Column(db.JSON))
# status = db.Column(db.Enum(WorkflowStatus))
# study_id = db.Column(db.Integer, db.ForeignKey("study.id"))
# study = db.relationship("StudyModel", backref="workflow", lazy="select")
# workflow_spec_id = db.Column(db.String)
# total_tasks = db.Column(db.Integer, default=0)
# completed_tasks = db.Column(db.Integer, default=0)
# last_updated = db.Column(db.DateTime(timezone=True), server_default=func.now())
# user_id = db.Column(db.String, default=None)
# state = db.Column(db.String, nullable=True)
# state_message = db.Column(db.String, nullable=True)

View File

@ -1,6 +1,6 @@
"""Api."""
import os
import json
import os
from flask import Blueprint
from flask import request
@ -53,4 +53,6 @@ def run_process() -> Response:
response = run(workflow, content.get("task_identifier"), content.get("answer"))
return Response(json.dumps({"response": response}), status=200, mimetype="application/json")
return Response(
json.dumps({"response": response}), status=200, mimetype="application/json"
)

View File

@ -1,10 +1,12 @@
"""Main."""
import json
from typing import Any
import flask.wrappers
from flask import Blueprint
from flask import request
from flask import Response
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from sqlalchemy.exc import IntegrityError # type: ignore
@ -20,21 +22,21 @@ def create_user(username: str) -> flask.wrappers.Response:
"""Create_user."""
user = UserModel.query.filter_by(username=username).first()
if user is not None:
return Response(
json.dumps({"error": f"User already exists: {username}"}),
status=409,
mimetype="application/json",
raise (
ApiError(
code="user_already_exists",
message=f"User already exists: {username}",
status_code=409,
)
)
user = UserModel(username=username)
try:
db.session.add(user)
except IntegrityError as exception:
return Response(
json.dumps({"error": repr(exception)}),
status=500,
mimetype="application/json",
)
raise (
ApiError(code="integrity_error", message=repr(exception), status_code=500)
) from exception
db.session.commit()
return Response(
@ -47,10 +49,12 @@ def delete_user(username: str) -> flask.wrappers.Response:
"""Delete_user."""
user = UserModel.query.filter_by(username=username).first()
if user is None:
return Response(
json.dumps({"error": f"User cannot be found: {username}"}),
status=400,
mimetype="application/json",
raise (
ApiError(
code="user_cannot_be_found",
message=f"User cannot be found: {username}",
status_code=400,
)
)
db.session.delete(user)
@ -64,21 +68,21 @@ def create_group(group_name: str) -> flask.wrappers.Response:
"""Create_group."""
group = GroupModel.query.filter_by(name=group_name).first()
if group is not None:
return Response(
json.dumps({"error": f"Group already exists: {group_name}"}),
status=409,
mimetype="application/json",
raise (
ApiError(
code="group_already_exists",
message=f"Group already exists: {group_name}",
status_code=409,
)
)
group = GroupModel(name=group_name)
try:
db.session.add(group)
except IntegrityError as exception:
return Response(
json.dumps({"error": repr(exception)}),
status=500,
mimetype="application/json",
)
raise (
ApiError(code="integrity_error", message=repr(exception), status_code=500)
) from exception
db.session.commit()
return Response(
@ -91,10 +95,12 @@ def delete_group(group_name: str) -> flask.wrappers.Response:
"""Delete_group."""
group = GroupModel.query.filter_by(name=group_name).first()
if group is None:
return Response(
json.dumps({"error": f"Group cannot be found: {group_name}"}),
status=400,
mimetype="application/json",
raise (
ApiError(
code="group_cannot_be_found",
message=f"Group cannot be found: {group_name}",
status_code=400,
)
)
db.session.delete(group)
@ -113,10 +119,12 @@ def assign_user_to_group() -> flask.wrappers.Response:
user_id=user.id, group_id=group.id
).first()
if user_group_assignment is not None:
return Response(
json.dumps({"error": f"User ({user.id}) is already in group ({group.id})"}),
status=409,
mimetype="application/json",
raise (
ApiError(
code="user_is_already_in_group",
message=f"User ({user.id}) is already in group ({group.id})",
status_code=409,
)
)
user_group_assignment = UserGroupAssignmentModel(user_id=user.id, group_id=group.id)
@ -140,10 +148,12 @@ def remove_user_from_group() -> flask.wrappers.Response:
user_id=user.id, group_id=group.id
).first()
if user_group_assignment is None:
return Response(
json.dumps({"error": f"User ({user.id}) is not in group ({group.id})"}),
status=400,
mimetype="application/json",
raise (
ApiError(
code="user_not_in_group",
message=f"User ({user.id}) is not in group ({group.id})",
status_code=400,
)
)
db.session.delete(user_group_assignment)
@ -156,46 +166,58 @@ def remove_user_from_group() -> flask.wrappers.Response:
)
def get_value_from_request_json(key):
def get_value_from_request_json(key: str) -> Any:
"""Get_value_from_request_json."""
if request.json is None:
return None
return request.json.get(key)
def get_user_from_request() -> UserModel:
def get_user_from_request() -> Any:
"""Get_user_from_request."""
user_id = get_value_from_request_json("user_id")
if user_id is None:
return Response(
"{error:'user_id required'}", status=400, mimetype="application/json"
raise (
ApiError(
code="user_id_is_required",
message="Attribute user_id is required",
status_code=400,
)
)
user = UserModel.query.filter_by(id=user_id).first()
if user is None:
return Response(
json.dumps({"error": f"User cannot be found: {user_id}"}),
status=400,
mimetype="application/json",
raise (
ApiError(
code="user_cannot_be_found",
message=f"User cannot be found: {user_id}",
status_code=400,
)
)
return user
def get_group_from_request() -> GroupModel:
def get_group_from_request() -> Any:
"""Get_group_from_request."""
group_id = get_value_from_request_json("group_id")
if group_id is None:
return Response(
"{error:'group_id required'}", status=400, mimetype="application/json"
raise (
ApiError(
code="group_id_is_required",
message="Attribute group_id is required",
status_code=400,
)
)
group = GroupModel.query.filter_by(id=group_id).first()
if group is None:
return Response(
json.dumps({"error": f"Group cannot be found: {group_id}"}),
status=400,
mimetype="application/json",
raise (
ApiError(
code="group_cannot_be_found",
message=f"Group cannot be found: {group_id}",
status_code=400,
)
)
return group

View File

@ -41,6 +41,7 @@ class Parser(BpmnDmnParser): # type: ignore
class ProcessStatus(TypedDict, total=False):
"""ProcessStatus."""
last_task: str
upcoming_tasks: List[str]
next_activity: Dict[str, str]

View File

@ -1,9 +1,11 @@
"""Test Api Blueprint."""
import json
from typing import Union
from flask_bpmn.models.db import db
from spiff_workflow_webapp.models.process_model import ProcessModel
from flask.testing import FlaskClient
from flask_bpmn.models.db import db
from spiff_workflow_webapp.models.process_model import ProcessModel
def test_user_can_be_created_and_deleted(client: FlaskClient) -> None:
@ -34,12 +36,15 @@ def test_user_can_be_created_and_deleted(client: FlaskClient) -> None:
db.session.commit()
def run_task(client: FlaskClient, request_body: dict, last_response: str) -> None:
def run_task(
client: FlaskClient, request_body: dict, last_response: Union[None, str]
) -> None:
"""Run_task."""
response = client.post("/run_process",
content_type="application/json",
data=json.dumps(request_body),
)
response = client.post(
"/run_process",
content_type="application/json",
data=json.dumps(request_body),
)
current_response = response.data
if last_response is not None:
assert last_response != current_response

View File

@ -1,5 +1,6 @@
"""Test User Blueprint."""
import json
from typing import Any
from flask.testing import FlaskClient
@ -7,6 +8,27 @@ from spiff_workflow_webapp.models.group import GroupModel
from spiff_workflow_webapp.models.user import UserModel
def test_acceptance(client: FlaskClient) -> None:
# Create a user U
user = create_user(client, "U")
# Create a group G
group_g = create_group(client, "G")
# Assign user U to group G
assign_user_to_group(client, user, group_g)
# Delete group G
delete_group(client, group_g.name)
# Create group H
group_h = create_group(client, "H")
# Assign user U to group H
assign_user_to_group(client, user, group_h)
# Unassign user U from group H
remove_user_from_group(client, user, group_h)
# Delete group H
delete_group(client, group_h.name)
# Delete user U
delete_user(client, user.username)
def test_user_can_be_created_and_deleted(client: FlaskClient) -> None:
username = "joe"
response = client.get(f"/user/{username}")
@ -94,28 +116,7 @@ def test_user_can_be_removed_from_a_group(client: FlaskClient) -> None:
delete_group(client, group.name)
def test_acceptance(client: FlaskClient) -> None:
# Create a user U
user = create_user(client, "U")
# Create a group G
group_g = create_group(client, "G")
# Assign user U to group G
assign_user_to_group(client, user, group_g)
# Delete group G
delete_group(client, group_g.name)
# Create group H
group_h = create_group(client, "H")
# Assign user U to group H
assign_user_to_group(client, user, group_h)
# Unassign user U from group H
remove_user_from_group(client, user, group_h)
# Delete group H
delete_group(client, group_h.name)
# Delete user U
delete_user(client, user.username)
def create_user(client: FlaskClient, username: str) -> UserModel:
def create_user(client: FlaskClient, username: str) -> Any:
response = client.get(f"/user/{username}")
assert response.status_code == 201
user = UserModel.query.filter_by(username=username).first()
@ -130,7 +131,7 @@ def delete_user(client: FlaskClient, username: str) -> None:
assert user is None
def create_group(client: FlaskClient, group_name: str) -> GroupModel:
def create_group(client: FlaskClient, group_name: str) -> Any:
response = client.get(f"/group/{group_name}")
assert response.status_code == 201
group = GroupModel.query.filter_by(name=group_name).first()