refactor get_user_info_from_id_token into authn service. w/ mike

This commit is contained in:
burnettk 2022-10-07 16:25:37 -04:00
parent 37441b8bef
commit 9bcd5a73a2
4 changed files with 57 additions and 55 deletions

View File

@ -17,7 +17,6 @@ from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.authentication_service import (
PublicAuthenticationService,
)
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService
"""
@ -59,7 +58,7 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
elif "iss" in decoded_token.keys():
try:
user_info = AuthorizationService().get_user_info_from_id_token(
user_info = PublicAuthenticationService.get_user_info_from_id_token(
token
)
except ApiError as ae:
@ -142,12 +141,12 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
def validate_scope(token: Any) -> bool:
"""Validate_scope."""
print("validate_scope")
# token = AuthorizationService().refresh_token(token)
# user_info = AuthorizationService().get_user_info_from_public_access_token(token)
# bearer_token = AuthorizationService().get_bearer_token(token)
# permission = AuthorizationService().get_permission_by_basic_token(token)
# permissions = AuthorizationService().get_permissions_by_token_for_resource_and_scope(token)
# introspection = AuthorizationService().introspect_token(basic_token)
# token = PublicAuthenticationService.refresh_token(token)
# user_info = PublicAuthenticationService.get_user_info_from_public_access_token(token)
# bearer_token = PublicAuthenticationService.get_bearer_token(token)
# permission = PublicAuthenticationService.get_permission_by_basic_token(token)
# permissions = PublicAuthenticationService.get_permissions_by_token_for_resource_and_scope(token)
# introspection = PublicAuthenticationService.introspect_token(basic_token)
return True
@ -218,7 +217,7 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
id_token = id_token_object["id_token"]
if PublicAuthenticationService.validate_id_token(id_token):
user_info = AuthorizationService().get_user_info_from_id_token(
user_info = PublicAuthenticationService.get_user_info_from_id_token(
id_token_object["access_token"]
)
if user_info and "error" not in user_info:

View File

@ -12,6 +12,8 @@ from flask import redirect
from flask_bpmn.api.api_error import ApiError
from werkzeug.wrappers.response import Response
from spiffworkflow_backend.services.authorization_service import AuthorizationService
def get_open_id_args() -> tuple:
"""Get_open_id_args."""
@ -44,6 +46,47 @@ class PublicAuthenticationService:
Used during development to make testing easy.
"""
@staticmethod
def get_user_info_from_id_token(token: str) -> dict:
"""This seems to work with basic tokens too."""
(
open_id_server_url,
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = AuthorizationService.get_open_id_args()
# backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
# backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
# backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
headers = {"Authorization": f"Bearer {token}"}
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/userinfo"
try:
request_response = requests.get(request_url, headers=headers)
except Exception as e:
current_app.logger.error(f"Exception in get_user_info_from_id_token: {e}")
raise ApiError(
code="token_error",
message=f"Exception in get_user_info_from_id_token: {e}",
status_code=401,
) from e
if request_response.status_code == 401:
raise ApiError(
code="invalid_token", message="Please login", status_code=401
)
elif request_response.status_code == 200:
user_info: dict = json.loads(request_response.text)
return user_info
raise ApiError(
code="user_info_error",
message="Cannot get user info in get_user_info_from_id_token",
status_code=401,
)
def get_backend_url(self) -> str:
"""Get_backend_url."""
return str(current_app.config["SPIFFWORKFLOW_BACKEND_URL"])

View File

@ -28,46 +28,6 @@ class AuthorizationService:
open_id_client_secret_key,
)
def get_user_info_from_id_token(self, token: str) -> dict:
"""This seems to work with basic tokens too."""
(
open_id_server_url,
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = AuthorizationService.get_open_id_args()
# backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
# backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
# backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
headers = {"Authorization": f"Bearer {token}"}
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/userinfo"
try:
request_response = requests.get(request_url, headers=headers)
except Exception as e:
current_app.logger.error(f"Exception in get_user_info_from_id_token: {e}")
raise ApiError(
code="token_error",
message=f"Exception in get_user_info_from_id_token: {e}",
status_code=401,
) from e
if request_response.status_code == 401:
raise ApiError(
code="invalid_token", message="Please login", status_code=401
)
elif request_response.status_code == 200:
user_info: dict = json.loads(request_response.text)
return user_info
raise ApiError(
code="user_info_error",
message="Cannot get user info in get_user_info_from_id_token",
status_code=401,
)
# def refresh_token(self, token: str) -> str:
# """Refresh_token."""
# # if isinstance(token, str):

View File

@ -9,7 +9,7 @@ class TestAuthorization(BaseTest):
# """Test_get_bearer_token."""
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# public_access_token = self.get_public_access_token(user_id, user_id)
# bearer_token = AuthorizationService().get_bearer_token(public_access_token)
# bearer_token = PublicAuthenticationService.get_bearer_token(public_access_token)
# assert isinstance(public_access_token, str)
# assert isinstance(bearer_token, dict)
# assert "access_token" in bearer_token
@ -25,7 +25,7 @@ class TestAuthorization(BaseTest):
# """Test_get_user_info_from_public_access_token."""
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# public_access_token = self.get_public_access_token(user_id, user_id)
# user_info = AuthorizationService().get_user_info_from_id_token(
# user_info = PublicAuthenticationService.get_user_info_from_id_token(
# public_access_token
# )
# assert "sub" in user_info
@ -46,7 +46,7 @@ class TestAuthorization(BaseTest):
# ) = self.get_keycloak_constants(app)
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# basic_token = self.get_public_access_token(user_id, user_id)
# introspection = AuthorizationService().introspect_token(basic_token)
# introspection = PublicAuthenticationService.introspect_token(basic_token)
# assert isinstance(introspection, dict)
# assert introspection["typ"] == "Bearer"
# assert introspection["preferred_username"] == user_id
@ -80,7 +80,7 @@ class TestAuthorization(BaseTest):
# for user_id in ("user_1", "user_2", "admin_1", "admin_2"):
# output[user_id] = {}
# basic_token = self.get_public_access_token(user_id, user_id)
# permissions = AuthorizationService().get_permission_by_basic_token(
# permissions = PublicAuthenticationService.get_permission_by_basic_token(
# basic_token
# )
# if isinstance(permissions, list):
@ -136,7 +136,7 @@ class TestAuthorization(BaseTest):
# for resource in resources:
# output[user_id][resource] = {}
# for scope in "instantiate", "read", "update", "delete":
# auth_status = AuthorizationService().get_auth_status_for_resource_and_scope_by_token(
# auth_status = PublicAuthenticationService.get_auth_status_for_resource_and_scope_by_token(
# basic_token, resource, scope
# )
# output[user_id][resource][scope] = auth_status
@ -152,7 +152,7 @@ class TestAuthorization(BaseTest):
# for resource in resource_names:
# output[user_id][resource] = {}
# for scope in "instantiate", "read", "update", "delete":
# permissions = AuthorizationService().get_permissions_by_token_for_resource_and_scope(
# permissions = PublicAuthenticationService.get_permissions_by_token_for_resource_and_scope(
# basic_token, resource, scope
# )
# output[user_id][resource][scope] = permissions