PreCommit again

This commit is contained in:
mike cullerton 2022-08-01 16:24:12 -04:00
parent 5a37ea14d0
commit 49d764bd3f
7 changed files with 73 additions and 59 deletions

View File

@ -10,7 +10,6 @@ from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
from typing import Any
class UserModel(SpiffworkflowBaseDBModel):
"""UserModel."""

View File

@ -174,9 +174,7 @@ def process_model_save(process_model_id: str, file_name: str) -> Union[str, Resp
@admin_blueprint.route("/process-models/<process_model_id>/run", methods=["GET"])
def process_model_run(process_model_id: str) -> Union[str, Response]:
"""Process_model_run."""
user = UserService().create_user(
"internal", "Mr. Test", username="Mr. Test"
)
user = UserService().create_user("internal", "Mr. Test", username="Mr. Test")
process_instance = ProcessInstanceService.create_process_instance(
process_model_id, user
)

View File

@ -1,7 +1,8 @@
"""User."""
import ast
import base64
from typing import Any, Dict
from typing import Any
from typing import Dict
from typing import Optional
import jwt
@ -9,6 +10,7 @@ from flask import current_app
from flask import g
from flask import redirect
from flask_bpmn.api.api_error import ApiError
from werkzeug.wrappers.response import Response
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authentication_service import (
@ -17,8 +19,6 @@ from spiffworkflow_backend.services.authentication_service import (
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService
from werkzeug.wrappers.response import Response
"""
.. module:: crc.api.user
:synopsis: Single Sign On (SSO) user login and session handlers
@ -58,13 +58,16 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
elif "iss" in decoded_token.keys():
try:
user_info = AuthorizationService().get_user_info_from_id_token(token)
user_info = AuthorizationService().get_user_info_from_id_token(
token
)
except ApiError as ae:
raise ae
except Exception as e:
current_app.logger.error(f"Exception raised in get_token: {e}")
raise ApiError(
code="fail_get_user_info", message="Cannot get user info from token"
code="fail_get_user_info",
message="Cannot get user info from token",
) from e
if (
@ -87,10 +90,14 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
)
# no user_info
else:
raise ApiError(code="no_user_info", message="Cannot retrieve user info")
raise ApiError(
code="no_user_info", message="Cannot retrieve user info"
)
else:
current_app.logger.debug("token_type not in decode_token in verify_token")
current_app.logger.debug(
"token_type not in decode_token in verify_token"
)
raise ApiError(
code="invalid_token",
message="Invalid token. Please log in.",
@ -109,31 +116,30 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[str]]:
else:
raise ApiError(code="no_user_id", message="Cannot get a user id")
raise ApiError(code="invalid_token",
message="Cannot validate token.",
status_code=401
)
raise ApiError(
code="invalid_token", message="Cannot validate token.", status_code=401
)
# no token -- do we ever get here?
# else:
# ...
# if current_app.config.get("DEVELOPMENT"):
# # Fall back to a default user if this is not production.
# g.user = UserModel.query.first()
# if not g.user:
# raise ApiError(
# "no_user",
# "You are in development mode, but there are no users in the database. Add one, and it will use it.",
# )
# token_from_user = g.user.encode_auth_token()
# token_info = UserModel.decode_auth_token(token_from_user)
# return token_info
#
# else:
# raise ApiError(
# code="no_auth_token",
# message="No authorization token was available.",
# status_code=401,
# )
# if current_app.config.get("DEVELOPMENT"):
# # Fall back to a default user if this is not production.
# g.user = UserModel.query.first()
# if not g.user:
# raise ApiError(
# "no_user",
# "You are in development mode, but there are no users in the database. Add one, and it will use it.",
# )
# token_from_user = g.user.encode_auth_token()
# token_info = UserModel.decode_auth_token(token_from_user)
# return token_info
#
# else:
# raise ApiError(
# code="no_auth_token",
# message="No authorization token was available.",
# status_code=401,
# )
def validate_scope(token: Any) -> bool:
@ -148,7 +154,7 @@ def validate_scope(token: Any) -> bool:
return True
def api_login(uid: str, password: str, redirect_url: str | None=None) -> dict:
def api_login(uid: str, password: str, redirect_url: str | None = None) -> dict:
"""Api_login."""
# TODO: Fix this! mac 20220801
token = PublicAuthenticationService().get_public_access_token(uid, password)
@ -167,8 +173,9 @@ def encode_auth_token(uid: str) -> str:
secret_key = current_app.config.get("SECRET_KEY")
else:
current_app.logger.error("Missing SECRET_KEY in encode_auth_token")
raise ApiError(code="encode_error",
message="Missing SECRET_KEY in encode_auth_token")
raise ApiError(
code="encode_error", message="Missing SECRET_KEY in encode_auth_token"
)
return jwt.encode(
payload,
str(secret_key),
@ -176,7 +183,7 @@ def encode_auth_token(uid: str) -> str:
)
def login(redirect_url: str="/") -> Response:
def login(redirect_url: str = "/") -> Response:
"""Login."""
state = PublicAuthenticationService.generate_state(redirect_url)
login_redirect_url = PublicAuthenticationService().get_login_redirect_url(state)
@ -229,14 +236,15 @@ def login_return(code: str, state: str, session_state: str) -> Response | None:
+ f"id_token={id_token}"
)
return redirect(redirect_url)
raise ApiError(code="invalid_login",
message="Login failed. Please try again",
status_code=401)
raise ApiError(
code="invalid_login", message="Login failed. Please try again", status_code=401
)
def logout(id_token: str, redirect_url: str | None) -> Response:
"""Logout."""
if redirect_url is None:
redirect_url = ''
redirect_url = ""
return PublicAuthenticationService().logout(
redirect_url=redirect_url, id_token=id_token
)

View File

@ -11,10 +11,10 @@ from flask import current_app
from flask import redirect
from flask_bpmn.api.api_error import ApiError
from keycloak import KeycloakOpenID # type: ignore
# from keycloak.uma_permissions import AuthStatus # noqa: F401
from werkzeug.wrappers.response import Response
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from werkzeug.wrappers.response import Response
# from keycloak.uma_permissions import AuthStatus # noqa: F401
def get_keycloak_args() -> tuple:
@ -48,10 +48,10 @@ class PublicAuthenticationService:
Used during development to make testing easy.
"""
def logout(self, id_token: str , redirect_url: str | None = None) -> Response:
def logout(self, id_token: str, redirect_url: str | None = None) -> Response:
"""Logout."""
if redirect_url is None:
redirect_url = '/'
redirect_url = "/"
return_redirect_url = "http://localhost:7000/v1.0/logout_return"
(
keycloak_server_url,
@ -190,7 +190,7 @@ class PublicAuthenticationService:
if public_response.status_code == 200:
public_token = json.loads(public_response.text)
if "access_token" in public_token:
access_token: dict = public_token['access_token']
access_token: dict = public_token["access_token"]
return access_token
raise ApiError(
code="no_public_access_token",

View File

@ -48,9 +48,11 @@ class AuthorizationService:
request_response = requests.get(request_url, headers=headers)
except Exception as e:
current_app.logger.error(f"Exception in get_user_info_from_id_token: {e}")
raise ApiError(code='token_error',
message=f"Exception in get_user_info_from_id_token: {e}",
status_code=401)
raise ApiError(
code="token_error",
message=f"Exception in get_user_info_from_id_token: {e}",
status_code=401,
)
if request_response.status_code == 401:
raise ApiError(
@ -60,9 +62,11 @@ class AuthorizationService:
user_info: dict = json.loads(request_response.text)
return user_info
raise ApiError(code='user_info_error',
message=f"Cannot get user info in get_user_info_from_id_token",
status_code=401)
raise ApiError(
code="user_info_error",
message=f"Cannot get user info in get_user_info_from_id_token",
status_code=401,
)
# def refresh_token(self, token: str) -> str:
# """Refresh_token."""

View File

@ -6,7 +6,6 @@ from flask import current_app
from flask import g
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from sqlalchemy.exc import IntegrityError
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import AdminSessionModel
@ -16,9 +15,16 @@ from spiffworkflow_backend.models.user import UserModel
class UserService:
"""Provides common tools for working with users."""
def create_user(self, service: str, service_id: str, name: str|None=None, username: str|None=None, email: str|None=None) -> UserModel:
def create_user(
self,
service: str,
service_id: str,
name: str | None = None,
username: str | None = None,
email: str | None = None,
) -> UserModel:
"""Create_user."""
user_model: UserModel|None = (
user_model: UserModel | None = (
UserModel.query.filter(UserModel.service == service)
.filter(UserModel.service_id == service_id)
.first()
@ -238,7 +244,9 @@ class UserService:
def create_principal(self, user_id: int) -> PrincipalModel:
"""Create_principal."""
principal: PrincipalModel | None = PrincipalModel.query.filter_by(user_id=user_id).first()
principal: PrincipalModel | None = PrincipalModel.query.filter_by(
user_id=user_id
).first()
if principal is None:
principal = PrincipalModel(user_id=user_id)
db.session.add(principal)

View File

@ -1,9 +1,6 @@
"""Test_authorization."""
from flask.app import Flask
from tests.spiffworkflow_backend.integration.base_test import BaseTest
from spiffworkflow_backend.services.authorization_service import AuthorizationService
class TestAuthorization(BaseTest):
"""TestAuthorization."""