created common method to check whether an api method should have auth w/ burnettk

This commit is contained in:
jasquat 2022-10-13 16:02:12 -04:00
parent c955335d0e
commit 94d50efb1f
4 changed files with 82 additions and 84 deletions

View File

@ -17,10 +17,10 @@ from flask_mail import Mail # type: ignore
import spiffworkflow_backend.load_database_models # noqa: F401
from spiffworkflow_backend.config import setup_config
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
from spiffworkflow_backend.routes.process_api_blueprint import check_for_permission
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
from spiffworkflow_backend.routes.user import verify_token
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.background_processing_service import (
BackgroundProcessingService,
)
@ -116,7 +116,7 @@ def create_app() -> flask.app.Flask:
configure_sentry(app)
app.before_request(verify_token)
app.before_request(check_for_permission)
app.before_request(AuthorizationService.check_for_permission)
return app # type: ignore

View File

@ -87,77 +87,6 @@ class ReactJsonSchemaSelectOption(TypedDict):
process_api_blueprint = Blueprint("process_api", __name__)
authorization_exclusion_list = ["status"]
# TODO: we can add the before_request to the blueprint
# directly when we switch over from connexion routes
# to blueprint routes
# @process_api_blueprint.before_request
def check_for_permission() -> None:
"""Check_for_permission."""
# print("WE CALL1")
if request.method == "OPTIONS":
return None
# print("WE CALL")
# return None
if not request.endpoint:
raise ApiError(
error_code="request_endpoint_not_found",
message="Could not find the endpong from the rquest.",
status_code=500,
)
api_view_function = current_app.view_functions[request.endpoint]
if (
api_view_function
and api_view_function.__name__.startswith("login")
or api_view_function.__name__.startswith("logout")
):
return None
if not hasattr(g, "user"):
raise ApiError(
error_code="user_not_logged_in",
message="User is not logged in. Please log in",
status_code=401,
)
if (
api_view_function
and api_view_function.__name__ not in authorization_exclusion_list
):
permission_string = get_permission_from_request_method()
if permission_string:
has_permission = AuthorizationService.user_has_permission(
user=g.user,
permission=permission_string,
target_uri=request.path,
)
if has_permission:
return None
raise ApiError(
error_code="unauthorized",
message="User is not authorized to perform requested action.",
status_code=403,
)
def get_permission_from_request_method() -> Optional[str]:
"""Get_permission_from_request_method."""
request_method_mapper = {
"POST": "create",
"GET": "read",
"PUT": "update",
"DELETE": "delete",
}
if request.method in request_method_mapper:
return request_method_mapper[request.method]
return None
def status() -> flask.wrappers.Response:

View File

@ -45,18 +45,9 @@ def verify_token(
ApiError: If not on production and token is not valid, returns an 'invalid_token' 403 error.
If on production and user is not authenticated, returns a 'no_user' 403 error.
"""
if request.method == "OPTIONS":
if AuthorizationService.should_disable_auth_for_request():
return None
if request.endpoint:
api_view_function = current_app.view_functions[request.endpoint]
if (
api_view_function
and api_view_function.__name__.startswith("login")
or api_view_function.__name__.startswith("logout")
):
return None
if not token and "Authorization" in request.headers:
token = request.headers["Authorization"].removeprefix("Bearer ")

View File

@ -5,7 +5,7 @@ from typing import Union
import jwt
import yaml
from flask import current_app
from flask import current_app, g, request
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from sqlalchemy import text
@ -184,6 +184,84 @@ class AuthorizationService:
db.session.commit()
return permission_assignment
@classmethod
def should_disable_auth_for_request(cls) -> bool:
authorization_exclusion_list = ["status"]
if request.method == "OPTIONS":
return True
if not request.endpoint:
raise ApiError(
error_code="request_endpoint_not_found",
message="Could not find the endpong from the rquest.",
status_code=500,
)
api_view_function = current_app.view_functions[request.endpoint]
if (
api_view_function
and api_view_function.__name__.startswith("login")
or api_view_function.__name__.startswith("logout")
or api_view_function.__name__ in authorization_exclusion_list
):
return True
return False
@classmethod
def get_permission_from_request_method(cls) -> Optional[str]:
"""Get_permission_from_request_method."""
request_method_mapper = {
"POST": "create",
"GET": "read",
"PUT": "update",
"DELETE": "delete",
}
if request.method in request_method_mapper:
return request_method_mapper[request.method]
return None
# TODO: we can add the before_request to the blueprint
# directly when we switch over from connexion routes
# to blueprint routes
# @process_api_blueprint.before_request
@classmethod
def check_for_permission(cls) -> None:
"""Check_for_permission."""
if cls.should_disable_auth_for_request():
return None
if not hasattr(g, "user"):
raise ApiError(
error_code="user_not_logged_in",
message="User is not logged in. Please log in",
status_code=401,
)
api_view_function = current_app.view_functions[request.endpoint]
if (
api_view_function
):
permission_string = cls.get_permission_from_request_method()
if permission_string:
has_permission = AuthorizationService.user_has_permission(
user=g.user,
permission=permission_string,
target_uri=request.path,
)
if has_permission:
return None
raise ApiError(
error_code="unauthorized",
message="User is not authorized to perform requested action.",
status_code=403,
)
# def refresh_token(self, token: str) -> str:
# """Refresh_token."""
# # if isinstance(token, str):