mirror of
https://github.com/sartography/spiffworkflow-backend.git
synced 2025-02-23 21:08:18 +00:00
keycloak -> open_id
also, assert we have a valid service when adding a user
This commit is contained in:
parent
18013f9456
commit
a2703d6c6e
@ -13,12 +13,12 @@ CORS_ALLOW_ORIGINS = re.split(
|
||||
r",\s*", environ.get("CORS_ALLOW_ORIGINS", default=CORS_DEFAULT)
|
||||
)
|
||||
|
||||
# Keycloak server
|
||||
KEYCLOAK_SERVER_URL = environ.get(
|
||||
"KEYCLOAK_SERVER_URL", default="http://localhost:7002"
|
||||
# Open ID server
|
||||
OPEN_ID_SERVER_URL = environ.get(
|
||||
"OPEN_ID_SERVER_URL", default="http://localhost:7002"
|
||||
)
|
||||
KEYCLOAK_CLIENT_ID = environ.get("KEYCLOAK_CLIENT_ID", default="spiffworkflow-backend")
|
||||
KEYCLOAK_REALM_NAME = environ.get("KEYCLOAK_REALM_NAME", default="spiffworkflow")
|
||||
KEYCLOAK_CLIENT_SECRET_KEY = environ.get(
|
||||
"KEYCLOAK_CLIENT_SECRET_KEY", default="JXeQExm0JhQPLumgHtIIqf52bDalHz0q"
|
||||
OPEN_ID_CLIENT_ID = environ.get("OPEN_ID_CLIENT_ID", default="spiffworkflow-backend")
|
||||
OPEN_ID_REALM_NAME = environ.get("OPEN_ID_REALM_NAME", default="spiffworkflow")
|
||||
OPEN_ID_CLIENT_SECRET_KEY = environ.get(
|
||||
"OPEN_ID_CLIENT_SECRET_KEY", default="JXeQExm0JhQPLumgHtIIqf52bDalHz0q"
|
||||
) # noqa: S105
|
||||
|
@ -6,9 +6,11 @@ from flask_bpmn.models.db import db
|
||||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
from marshmallow import Schema
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.orm import validates
|
||||
|
||||
from spiffworkflow_backend.models.group import GroupModel
|
||||
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
|
||||
from spiffworkflow_backend.services.authentication_service import AuthenticationProviderTypes
|
||||
|
||||
|
||||
class UserModel(SpiffworkflowBaseDBModel):
|
||||
@ -31,10 +33,10 @@ class UserModel(SpiffworkflowBaseDBModel):
|
||||
overlaps="user_group_assignments,users",
|
||||
)
|
||||
|
||||
# @validates('service')
|
||||
# def validate_service(self, key, value):
|
||||
# assert value != ''
|
||||
# return True
|
||||
@validates('service')
|
||||
def validate_service(self, key, value):
|
||||
assert value in AuthenticationProviderTypes._member_names_
|
||||
return value
|
||||
|
||||
def encode_auth_token(self) -> str:
|
||||
"""Generate the Auth Token.
|
||||
|
@ -75,7 +75,7 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
|
||||
user_info is not None and "error" not in user_info
|
||||
): # not sure what to test yet
|
||||
user_model = (
|
||||
UserModel.query.filter(UserModel.service == "keycloak")
|
||||
UserModel.query.filter(UserModel.service == "open_id")
|
||||
.filter(UserModel.service_id == user_info["sub"])
|
||||
.first()
|
||||
)
|
||||
@ -83,7 +83,7 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
|
||||
# Do we ever get here any more, now that we have login_return method?
|
||||
current_app.logger.debug("create_user in verify_token")
|
||||
user_model = UserService().create_user(
|
||||
service="keycloak",
|
||||
service="open_id",
|
||||
service_id=user_info["sub"],
|
||||
name=user_info["name"],
|
||||
username=user_info["preferred_username"],
|
||||
@ -197,7 +197,7 @@ def login(redirect_url: str = "/") -> Response:
|
||||
def login_return(code: str, state: str, session_state: str) -> Optional[Response]:
|
||||
"""Login_return."""
|
||||
state_dict = ast.literal_eval(
|
||||
base64.b64decode(ast.literal_eval(state)).decode("utf-8")
|
||||
base64.b64decode(state).decode("utf-8")
|
||||
)
|
||||
state_redirect_url = state_dict["redirect_url"]
|
||||
|
||||
@ -210,7 +210,7 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
|
||||
)
|
||||
if user_info and "error" not in user_info:
|
||||
user_model = (
|
||||
UserModel.query.filter(UserModel.service == "keycloak")
|
||||
UserModel.query.filter(UserModel.service == "open_id")
|
||||
.filter(UserModel.service_id == user_info["sub"])
|
||||
.first()
|
||||
)
|
||||
@ -224,7 +224,7 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
|
||||
if "email" in user_info:
|
||||
email = user_info["email"]
|
||||
user_model = UserService().create_user(
|
||||
service="keycloak",
|
||||
service="open_id",
|
||||
service_id=user_info["sub"],
|
||||
name=name,
|
||||
username=username,
|
||||
|
@ -13,26 +13,26 @@ from flask_bpmn.api.api_error import ApiError
|
||||
from werkzeug.wrappers.response import Response
|
||||
|
||||
|
||||
def get_keycloak_args() -> tuple:
|
||||
"""Get_keycloak_args."""
|
||||
keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"]
|
||||
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
|
||||
keycloak_realm_name = current_app.config["KEYCLOAK_REALM_NAME"]
|
||||
keycloak_client_secret_key = current_app.config[
|
||||
"KEYCLOAK_CLIENT_SECRET_KEY"
|
||||
def get_open_id_args() -> tuple:
|
||||
"""Get_open_id_args."""
|
||||
open_id_server_url = current_app.config["OPEN_ID_SERVER_URL"]
|
||||
open_id_client_id = current_app.config["OPEN_ID_CLIENT_ID"]
|
||||
open_id_realm_name = current_app.config["OPEN_ID_REALM_NAME"]
|
||||
open_id_client_secret_key = current_app.config[
|
||||
"OPEN_ID_CLIENT_SECRET_KEY"
|
||||
] # noqa: S105
|
||||
return (
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
)
|
||||
|
||||
|
||||
class AuthenticationServiceProviders(enum.Enum):
|
||||
class AuthenticationProviderTypes(enum.Enum):
|
||||
"""AuthenticationServiceProviders."""
|
||||
|
||||
keycloak = "keycloak"
|
||||
open_id = "open_id"
|
||||
internal = "internal"
|
||||
|
||||
|
||||
@ -40,7 +40,7 @@ class PublicAuthenticationService:
|
||||
"""PublicAuthenticationService."""
|
||||
|
||||
"""Not sure where/if this ultimately lives.
|
||||
It uses a separate public keycloak client: spiffworkflow-frontend
|
||||
It uses a separate public open_id client: spiffworkflow-frontend
|
||||
Used during development to make testing easy.
|
||||
"""
|
||||
|
||||
@ -50,13 +50,13 @@ class PublicAuthenticationService:
|
||||
redirect_url = "/"
|
||||
return_redirect_url = "http://localhost:7000/v1.0/logout_return"
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = get_open_id_args()
|
||||
request_url = (
|
||||
f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/logout?"
|
||||
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/logout?"
|
||||
+ f"post_logout_redirect_uri={return_redirect_url}&"
|
||||
+ f"id_token_hint={id_token}"
|
||||
)
|
||||
@ -72,17 +72,17 @@ class PublicAuthenticationService:
|
||||
def get_login_redirect_url(self, state: str) -> str:
|
||||
"""Get_login_redirect_url."""
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = get_open_id_args()
|
||||
return_redirect_url = "http://localhost:7000/v1.0/login_return"
|
||||
login_redirect_url = (
|
||||
f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/auth?"
|
||||
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/auth?"
|
||||
+ f"state={state}&"
|
||||
+ "response_type=code&"
|
||||
+ f"client_id={keycloak_client_id}&"
|
||||
+ f"client_id={open_id_client_id}&"
|
||||
+ "scope=openid&"
|
||||
+ f"redirect_uri={return_redirect_url}"
|
||||
)
|
||||
@ -91,13 +91,13 @@ class PublicAuthenticationService:
|
||||
def get_id_token_object(self, code: str) -> dict:
|
||||
"""Get_id_token_object."""
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = get_open_id_args()
|
||||
|
||||
backend_basic_auth_string = f"{keycloak_client_id}:{keycloak_client_secret_key}"
|
||||
backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
|
||||
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
|
||||
backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
|
||||
headers = {
|
||||
@ -111,7 +111,7 @@ class PublicAuthenticationService:
|
||||
"redirect_uri": "http://localhost:7000/v1.0/login_return",
|
||||
}
|
||||
|
||||
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
|
||||
response = requests.post(request_url, data=data, headers=headers)
|
||||
id_token_object: dict = json.loads(response.text)
|
||||
@ -123,11 +123,11 @@ class PublicAuthenticationService:
|
||||
valid = True
|
||||
now = time.time()
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = get_open_id_args()
|
||||
try:
|
||||
decoded_token = jwt.decode(id_token, options={"verify_signature": False})
|
||||
except Exception as e:
|
||||
@ -136,16 +136,16 @@ class PublicAuthenticationService:
|
||||
) from e
|
||||
if (
|
||||
decoded_token["iss"]
|
||||
!= f"{keycloak_server_url}/realms/{keycloak_realm_name}"
|
||||
!= f"{open_id_server_url}/realms/{open_id_realm_name}"
|
||||
):
|
||||
valid = False
|
||||
elif (
|
||||
keycloak_client_id not in decoded_token["aud"]
|
||||
open_id_client_id not in decoded_token["aud"]
|
||||
and "account" not in decoded_token["aud"]
|
||||
):
|
||||
valid = False
|
||||
elif "azp" in decoded_token and decoded_token["azp"] not in (
|
||||
keycloak_client_id,
|
||||
open_id_client_id,
|
||||
"account",
|
||||
):
|
||||
valid = False
|
||||
|
@ -13,37 +13,37 @@ class AuthorizationService:
|
||||
"""Determine whether a user has permission to perform their request."""
|
||||
|
||||
@staticmethod
|
||||
def get_keycloak_args() -> tuple:
|
||||
"""Get_keycloak_args."""
|
||||
keycloak_server_url = current_app.config["KEYCLOAK_SERVER_URL"]
|
||||
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
|
||||
keycloak_realm_name = current_app.config["KEYCLOAK_REALM_NAME"]
|
||||
keycloak_client_secret_key = current_app.config[
|
||||
"KEYCLOAK_CLIENT_SECRET_KEY"
|
||||
def get_open_id_args() -> tuple:
|
||||
"""Get_open_id_args."""
|
||||
open_id_server_url = current_app.config["OPEN_ID_SERVER_URL"]
|
||||
open_id_client_id = current_app.config["OPEN_ID_CLIENT_ID"]
|
||||
open_id_realm_name = current_app.config["OPEN_ID_REALM_NAME"]
|
||||
open_id_client_secret_key = current_app.config[
|
||||
"OPEN_ID_CLIENT_SECRET_KEY"
|
||||
] # noqa: S105
|
||||
return (
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
)
|
||||
|
||||
def get_user_info_from_id_token(self, token: str) -> dict:
|
||||
"""This seems to work with basic tokens too."""
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = AuthorizationService.get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = AuthorizationService.get_open_id_args()
|
||||
|
||||
# backend_basic_auth_string = f"{keycloak_client_id}:{keycloak_client_secret_key}"
|
||||
# backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
|
||||
# backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
|
||||
# backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
|
||||
|
||||
headers = {"Authorization": f"Bearer {token}"}
|
||||
|
||||
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/userinfo"
|
||||
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/userinfo"
|
||||
try:
|
||||
request_response = requests.get(request_url, headers=headers)
|
||||
except Exception as e:
|
||||
@ -73,13 +73,13 @@ class AuthorizationService:
|
||||
# # if isinstance(token, str):
|
||||
# # token = eval(token)
|
||||
# (
|
||||
# keycloak_server_url,
|
||||
# keycloak_client_id,
|
||||
# keycloak_realm_name,
|
||||
# keycloak_client_secret_key,
|
||||
# ) = AuthorizationService.get_keycloak_args()
|
||||
# open_id_server_url,
|
||||
# open_id_client_id,
|
||||
# open_id_realm_name,
|
||||
# open_id_client_secret_key,
|
||||
# ) = AuthorizationService.get_open_id_args()
|
||||
# headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
# request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
# data = {
|
||||
# "grant_type": "refresh_token",
|
||||
# "client_id": "spiffworkflow-frontend",
|
||||
@ -93,13 +93,13 @@ class AuthorizationService:
|
||||
def get_bearer_token(self, basic_token: str) -> dict:
|
||||
"""Get_bearer_token."""
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = AuthorizationService.get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = AuthorizationService.get_open_id_args()
|
||||
|
||||
backend_basic_auth_string = f"{keycloak_client_id}:{keycloak_client_secret_key}"
|
||||
backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
|
||||
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
|
||||
backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
|
||||
|
||||
@ -109,11 +109,11 @@ class AuthorizationService:
|
||||
}
|
||||
data = {
|
||||
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
|
||||
"client_id": keycloak_client_id,
|
||||
"client_id": open_id_client_id,
|
||||
"subject_token": basic_token,
|
||||
"audience": keycloak_client_id,
|
||||
"audience": open_id_client_id,
|
||||
}
|
||||
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
|
||||
backend_response = requests.post(request_url, headers=headers, data=data)
|
||||
# json_data = json.loads(backend_response.text)
|
||||
@ -154,11 +154,11 @@ class AuthorizationService:
|
||||
# def introspect_token(self, basic_token: str) -> dict:
|
||||
# """Introspect_token."""
|
||||
# (
|
||||
# keycloak_server_url,
|
||||
# keycloak_client_id,
|
||||
# keycloak_realm_name,
|
||||
# keycloak_client_secret_key,
|
||||
# ) = AuthorizationService.get_keycloak_args()
|
||||
# open_id_server_url,
|
||||
# open_id_client_id,
|
||||
# open_id_realm_name,
|
||||
# open_id_client_secret_key,
|
||||
# ) = AuthorizationService.get_open_id_args()
|
||||
#
|
||||
# bearer_token = AuthorizationService().get_bearer_token(basic_token)
|
||||
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
|
||||
@ -168,11 +168,11 @@ class AuthorizationService:
|
||||
# "Authorization": auth_bearer_string,
|
||||
# }
|
||||
# data = {
|
||||
# "client_id": keycloak_client_id,
|
||||
# "client_secret": keycloak_client_secret_key,
|
||||
# "client_id": open_id_client_id,
|
||||
# "client_secret": open_id_client_secret_key,
|
||||
# "token": basic_token,
|
||||
# }
|
||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token/introspect"
|
||||
# request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token/introspect"
|
||||
#
|
||||
# introspect_response = requests.post(request_url, headers=headers, data=data)
|
||||
# introspection = json.loads(introspect_response.text)
|
||||
@ -182,11 +182,11 @@ class AuthorizationService:
|
||||
# def get_permission_by_basic_token(self, basic_token: dict) -> list:
|
||||
# """Get_permission_by_basic_token."""
|
||||
# (
|
||||
# keycloak_server_url,
|
||||
# keycloak_client_id,
|
||||
# keycloak_realm_name,
|
||||
# keycloak_client_secret_key,
|
||||
# ) = AuthorizationService.get_keycloak_args()
|
||||
# open_id_server_url,
|
||||
# open_id_client_id,
|
||||
# open_id_realm_name,
|
||||
# open_id_client_secret_key,
|
||||
# ) = AuthorizationService.get_open_id_args()
|
||||
#
|
||||
# # basic_token = AuthorizationService().refresh_token(basic_token)
|
||||
# # bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
|
||||
@ -199,14 +199,14 @@ class AuthorizationService:
|
||||
# "Authorization": auth_bearer_string,
|
||||
# }
|
||||
# data = {
|
||||
# "client_id": keycloak_client_id,
|
||||
# "client_secret": keycloak_client_secret_key,
|
||||
# "client_id": open_id_client_id,
|
||||
# "client_secret": open_id_client_secret_key,
|
||||
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
|
||||
# "response_mode": "permissions",
|
||||
# "audience": keycloak_client_id,
|
||||
# "audience": open_id_client_id,
|
||||
# "response_include_resource_name": True,
|
||||
# }
|
||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
# request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
# permission_response = requests.post(request_url, headers=headers, data=data)
|
||||
# permission = json.loads(permission_response.text)
|
||||
# return permission
|
||||
@ -216,11 +216,11 @@ class AuthorizationService:
|
||||
# ) -> str:
|
||||
# """Get_auth_status_for_resource_and_scope_by_token."""
|
||||
# (
|
||||
# keycloak_server_url,
|
||||
# keycloak_client_id,
|
||||
# keycloak_realm_name,
|
||||
# keycloak_client_secret_key,
|
||||
# ) = AuthorizationService.get_keycloak_args()
|
||||
# open_id_server_url,
|
||||
# open_id_client_id,
|
||||
# open_id_realm_name,
|
||||
# open_id_client_secret_key,
|
||||
# ) = AuthorizationService.get_open_id_args()
|
||||
#
|
||||
# # basic_token = AuthorizationService().refresh_token(basic_token)
|
||||
# bearer_token = AuthorizationService().get_bearer_token(basic_token)
|
||||
@ -231,14 +231,14 @@ class AuthorizationService:
|
||||
# "Authorization": auth_bearer_string,
|
||||
# }
|
||||
# data = {
|
||||
# "client_id": keycloak_client_id,
|
||||
# "client_secret": keycloak_client_secret_key,
|
||||
# "client_id": open_id_client_id,
|
||||
# "client_secret": open_id_client_secret_key,
|
||||
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
|
||||
# "permission": f"{resource}#{scope}",
|
||||
# "response_mode": "permissions",
|
||||
# "audience": keycloak_client_id,
|
||||
# "audience": open_id_client_id,
|
||||
# }
|
||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
# request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
# auth_response = requests.post(request_url, headers=headers, data=data)
|
||||
#
|
||||
# print("get_auth_status_for_resource_and_scope_by_token")
|
||||
@ -250,11 +250,11 @@ class AuthorizationService:
|
||||
# ) -> str:
|
||||
# """Get_permissions_by_token_for_resource_and_scope."""
|
||||
# (
|
||||
# keycloak_server_url,
|
||||
# keycloak_client_id,
|
||||
# keycloak_realm_name,
|
||||
# keycloak_client_secret_key,
|
||||
# ) = AuthorizationService.get_keycloak_args()
|
||||
# open_id_server_url,
|
||||
# open_id_client_id,
|
||||
# open_id_realm_name,
|
||||
# open_id_client_secret_key,
|
||||
# ) = AuthorizationService.get_open_id_args()
|
||||
#
|
||||
# # basic_token = AuthorizationService().refresh_token(basic_token)
|
||||
# # bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
|
||||
@ -271,15 +271,15 @@ class AuthorizationService:
|
||||
# if scope is not None and scope != '':
|
||||
# permision += "#" + scope
|
||||
# data = {
|
||||
# "client_id": keycloak_client_id,
|
||||
# "client_secret": keycloak_client_secret_key,
|
||||
# "client_id": open_id_client_id,
|
||||
# "client_secret": open_id_client_secret_key,
|
||||
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
|
||||
# "response_mode": "permissions",
|
||||
# "permission": permision,
|
||||
# "audience": keycloak_client_id,
|
||||
# "audience": open_id_client_id,
|
||||
# "response_include_resource_name": True,
|
||||
# }
|
||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
# request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
# permission_response = requests.post(request_url, headers=headers, data=data)
|
||||
# permission: str = json.loads(permission_response.text)
|
||||
# return permission
|
||||
@ -287,11 +287,11 @@ class AuthorizationService:
|
||||
# def get_resource_set(self, public_access_token, uri):
|
||||
# """Get_resource_set."""
|
||||
# (
|
||||
# keycloak_server_url,
|
||||
# keycloak_client_id,
|
||||
# keycloak_realm_name,
|
||||
# keycloak_client_secret_key,
|
||||
# ) = AuthorizationService.get_keycloak_args()
|
||||
# open_id_server_url,
|
||||
# open_id_client_id,
|
||||
# open_id_realm_name,
|
||||
# open_id_client_secret_key,
|
||||
# ) = AuthorizationService.get_open_id_args()
|
||||
# bearer_token = AuthorizationService().get_bearer_token(public_access_token)
|
||||
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
|
||||
# headers = {
|
||||
@ -307,7 +307,7 @@ class AuthorizationService:
|
||||
# }
|
||||
#
|
||||
# # f"matchingUri=true&deep=true&max=-1&exactName=false&uri={URI_TO_TEST_AGAINST}"
|
||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/authz/protection/resource_set"
|
||||
# request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/authz/protection/resource_set"
|
||||
# response = requests.get(request_url, headers=headers, data=data)
|
||||
#
|
||||
# print("get_resource_set")
|
||||
@ -316,11 +316,11 @@ class AuthorizationService:
|
||||
"""Get_permission_by_token."""
|
||||
# TODO: Write a test for this
|
||||
(
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
) = AuthorizationService.get_keycloak_args()
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
) = AuthorizationService.get_open_id_args()
|
||||
bearer_token = AuthorizationService().get_bearer_token(public_access_token)
|
||||
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
|
||||
headers = {
|
||||
@ -329,9 +329,9 @@ class AuthorizationService:
|
||||
}
|
||||
data = {
|
||||
"grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
|
||||
"audience": keycloak_client_id,
|
||||
"audience": open_id_client_id,
|
||||
}
|
||||
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
||||
request_url = f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/token"
|
||||
permission_response = requests.post(request_url, headers=headers, data=data)
|
||||
permission: dict = json.loads(permission_response.text)
|
||||
|
||||
|
@ -16,7 +16,7 @@ class BaseTest:
|
||||
if isinstance(user, UserModel):
|
||||
return user
|
||||
|
||||
user = UserService().create_user("local", username, username=username)
|
||||
user = UserService().create_user("internal", username, username=username)
|
||||
if isinstance(user, UserModel):
|
||||
UserService().create_principal(user_id=user.id)
|
||||
return user
|
||||
@ -26,20 +26,20 @@ class BaseTest:
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_keycloak_constants(app: Flask) -> tuple:
|
||||
"""Get_keycloak_constants."""
|
||||
keycloak_server_url = app.config["KEYCLOAK_SERVER_URL"]
|
||||
keycloak_client_id = app.config["KEYCLOAK_CLIENT_ID"]
|
||||
keycloak_realm_name = app.config["KEYCLOAK_REALM_NAME"]
|
||||
keycloak_client_secret_key = app.config[
|
||||
"KEYCLOAK_CLIENT_SECRET_KEY"
|
||||
def get_open_id_constants(app: Flask) -> tuple:
|
||||
"""Get_open_id_constants."""
|
||||
open_id_server_url = app.config["OPEN_ID_SERVER_URL"]
|
||||
open_id_client_id = app.config["OPEN_ID_CLIENT_ID"]
|
||||
open_id_realm_name = app.config["OPEN_ID_REALM_NAME"]
|
||||
open_id_client_secret_key = app.config[
|
||||
"OPEN_ID_CLIENT_SECRET_KEY"
|
||||
] # noqa: S105
|
||||
|
||||
return (
|
||||
keycloak_server_url,
|
||||
keycloak_client_id,
|
||||
keycloak_realm_name,
|
||||
keycloak_client_secret_key,
|
||||
open_id_server_url,
|
||||
open_id_client_id,
|
||||
open_id_realm_name,
|
||||
open_id_client_secret_key,
|
||||
)
|
||||
|
||||
# @staticmethod
|
||||
|
Loading…
x
Reference in New Issue
Block a user