moved code to auth service

This commit is contained in:
mike cullerton 2022-07-25 14:00:42 -04:00
parent b33a7e8565
commit d19825a6a4
3 changed files with 182 additions and 162 deletions

View File

@ -12,6 +12,9 @@ from keycloak import KeycloakOpenID # type: ignore
from keycloak.uma_permissions import AuthStatus # type: ignore
from keycloak import KeycloakAdmin
from spiffworkflow_backend.services.authorization_service import AuthorizationService
class AuthenticationServiceProviders(enum.Enum):
keycloak = 'keycloak'
internal = 'internal'
@ -20,9 +23,10 @@ class AuthenticationServiceProviders(enum.Enum):
class PublicAuthenticationService:
"""Not sure where/if this ultimately lives.
It uses a separate public keycloak client: spiffworkflow-frontend
Used during development to make testing easy.
"""
def get_public_access_token(self, username, password) -> dict:
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
# Get public access token
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
headers = {"Content-Type": "application/x-www-form-urlencoded"}
@ -40,160 +44,6 @@ class PublicAuthenticationService:
class AuthenticationService:
"""AuthenticationService."""
@staticmethod
def get_keycloak_args():
keycloak_server_url = current_app.config['KEYCLOAK_SERVER_URL']
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
keycloak_realm_name = current_app.config["KEYCLOAK_REALM_NAME"]
keycloak_client_secret_key = current_app.config["KEYCLOAK_CLIENT_SECRET_KEY"] # noqa: S105
return keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
def get_user_info_from_public_access_token(self, public_access_token):
"""This seems to work with basic tokens too"""
json_data = None
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
bearer_token = self.get_bearer_token(public_access_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
# auth_bearer_string = f"Bearer {public_access_token}"
headers = {"Content-Type": "application/json",
"Authorization": auth_bearer_string}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
# "subject_token": bearer_token['access_token'],
"subject_token": public_access_token,
"audience": keycloak_client_id}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/userinfo"
try:
request_response = requests.post(request_url, headers=headers, data=data)
except Exception as e:
print(f"get_user_from_token: Exeption: {e}")
else:
print("else")
json_data = json.loads(request_response.text)
return json_data
def refresh_token(self, basic_token):
# if isinstance(basic_token, str):
# basic_token = eval(basic_token)
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
data = {'grant_type': 'refresh_token',
'client_id': 'spiffworkflow-frontend',
'subject_token': basic_token,
'refresh_token': basic_token}
refresh_response = requests.post(request_url, headers=headers, data=data)
refresh_token = json.loads(refresh_response.text)
return refresh_token
def get_bearer_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
BACKEND_BASIC_AUTH_STRING = f"{keycloak_client_id}:{keycloak_client_secret_key}"
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding='ascii')
BACKEND_BASIC_AUTH = base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}"}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
"subject_token": basic_token,
"audience": keycloak_client_id}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
backend_response = requests.post(request_url, headers=headers, data=data)
# json_data = json.loads(backend_response.text)
# bearer_token = json_data['access_token']
bearer_token = json.loads(backend_response.text)
return bearer_token
def introspect_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
bearer_token = AuthenticationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
'token': basic_token}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token/introspect"
introspect_response = requests.post(request_url, headers=headers, data=data)
introspection = json.loads(introspect_response.text)
return introspection
def get_permission_by_basic_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
# basic_token = AuthenticationService().refresh_token(basic_token)
# bearer_token = AuthenticationService().get_bearer_token(basic_token['access_token'])
bearer_token = AuthenticationService().get_bearer_token(basic_token)
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
auth_bearer_string = f"Bearer {bearer_token}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"response_mode": "permissions",
"audience": keycloak_client_id
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)
return permission
def get_auth_status_for_resource_and_scope_by_token(self, basic_token, resource, scope):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
# basic_token = AuthenticationService().refresh_token(basic_token)
bearer_token = AuthenticationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {
'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"permission": f"{resource}#{scope}",
"response_mode": "permissions",
"audience": keycloak_client_id
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
auth_response = requests.post(request_url, headers=headers, data=data)
print("get_auth_status_for_resource_and_scope_by_token")
auth_status = json.loads(auth_response.text)
return auth_status
def get_permissions_by_token_for_resource_and_scope(self, basic_token, resource, scope):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthenticationService.get_keycloak_args()
# basic_token = AuthenticationService().refresh_token(basic_token)
# bearer_token = AuthenticationService().get_bearer_token(basic_token['access_token'])
bearer_token = AuthenticationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"response_mode": "permissions",
"permission": f"{resource}#{scope}",
"audience": keycloak_client_id
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)
return permission

View File

@ -0,0 +1,169 @@
"""Authorization_service."""
import requests
import base64
import json
import enum
from flask import g
from flask import current_app
class AuthorizationService:
"""Determine whether a user has permission to perform their request."""
@staticmethod
def get_keycloak_args():
keycloak_server_url = current_app.config['KEYCLOAK_SERVER_URL']
keycloak_client_id = current_app.config["KEYCLOAK_CLIENT_ID"]
keycloak_realm_name = current_app.config["KEYCLOAK_REALM_NAME"]
keycloak_client_secret_key = current_app.config["KEYCLOAK_CLIENT_SECRET_KEY"] # noqa: S105
return keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key
def get_user_info_from_public_access_token(self, public_access_token):
"""This seems to work with basic tokens too"""
json_data = None
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
bearer_token = self.get_bearer_token(public_access_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
# auth_bearer_string = f"Bearer {public_access_token}"
headers = {"Content-Type": "application/json",
"Authorization": auth_bearer_string}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
# "subject_token": bearer_token['access_token'],
"subject_token": public_access_token,
"audience": keycloak_client_id}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/userinfo"
try:
request_response = requests.post(request_url, headers=headers, data=data)
except Exception as e:
print(f"get_user_from_token: Exeption: {e}")
else:
print("else")
json_data = json.loads(request_response.text)
return json_data
def refresh_token(self, basic_token):
# if isinstance(basic_token, str):
# basic_token = eval(basic_token)
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
headers = {"Content-Type": "application/x-www-form-urlencoded"}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
data = {'grant_type': 'refresh_token',
'client_id': 'spiffworkflow-frontend',
'subject_token': basic_token,
'refresh_token': basic_token}
refresh_response = requests.post(request_url, headers=headers, data=data)
refresh_token = json.loads(refresh_response.text)
return refresh_token
def get_bearer_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
BACKEND_BASIC_AUTH_STRING = f"{keycloak_client_id}:{keycloak_client_secret_key}"
BACKEND_BASIC_AUTH_BYTES = bytes(BACKEND_BASIC_AUTH_STRING, encoding='ascii')
BACKEND_BASIC_AUTH = base64.b64encode(BACKEND_BASIC_AUTH_BYTES)
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {BACKEND_BASIC_AUTH.decode('utf-8')}"}
data = {'grant_type': 'urn:ietf:params:oauth:grant-type:token-exchange',
'client_id': keycloak_client_id,
"subject_token": basic_token,
"audience": keycloak_client_id}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
backend_response = requests.post(request_url, headers=headers, data=data)
# json_data = json.loads(backend_response.text)
# bearer_token = json_data['access_token']
bearer_token = json.loads(backend_response.text)
return bearer_token
def introspect_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
'token': basic_token}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token/introspect"
introspect_response = requests.post(request_url, headers=headers, data=data)
introspection = json.loads(introspect_response.text)
return introspection
def get_permission_by_basic_token(self, basic_token):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
# bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
bearer_token = AuthorizationService().get_bearer_token(basic_token)
# auth_bearer_string = f"Bearer {bearer_token['access_token']}"
auth_bearer_string = f"Bearer {bearer_token}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"response_mode": "permissions",
"audience": keycloak_client_id
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)
return permission
def get_auth_status_for_resource_and_scope_by_token(self, basic_token, resource, scope):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {
'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"permission": f"{resource}#{scope}",
"response_mode": "permissions",
"audience": keycloak_client_id
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
auth_response = requests.post(request_url, headers=headers, data=data)
print("get_auth_status_for_resource_and_scope_by_token")
auth_status = json.loads(auth_response.text)
return auth_status
def get_permissions_by_token_for_resource_and_scope(self, basic_token, resource, scope):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = AuthorizationService.get_keycloak_args()
# basic_token = AuthorizationService().refresh_token(basic_token)
# bearer_token = AuthorizationService().get_bearer_token(basic_token['access_token'])
bearer_token = AuthorizationService().get_bearer_token(basic_token)
auth_bearer_string = f"Bearer {bearer_token['access_token']}"
headers = {"Content-Type": "application/x-www-form-urlencoded",
"Authorization": auth_bearer_string}
data = {'client_id': keycloak_client_id,
'client_secret': keycloak_client_secret_key,
"grant_type": 'urn:ietf:params:oauth:grant-type:uma-ticket',
"response_mode": "permissions",
"permission": f"{resource}#{scope}",
"audience": keycloak_client_id
}
request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
permission_response = requests.post(request_url, headers=headers, data=data)
permission = json.loads(permission_response.text)
return permission
class KeycloakAuthorization:
"""Interface with Keycloak server."""

View File

@ -13,7 +13,8 @@ from keycloak.keycloak_openid import KeycloakOpenID # type: ignore
from keycloak.uma_permissions import AuthStatus # type: ignore
from tests.spiffworkflow_backend.integration.base_test import BaseTest
from spiffworkflow_backend.services.authentication_service import AuthenticationService, PublicAuthenticationService, KeycloakAuthenticationService
from spiffworkflow_backend.services.authentication_service import PublicAuthenticationService, KeycloakAuthenticationService
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from urllib.parse import urlencode
@ -41,7 +42,7 @@ class TestAuthentication(BaseTest):
def test_get_bearer_token(self, app: Flask) -> None:
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
public_access_token = self.get_public_access_token(user_id, user_id)
bearer_token = AuthenticationService().get_bearer_token(public_access_token)
bearer_token = AuthorizationService().get_bearer_token(public_access_token)
assert isinstance(public_access_token, str)
assert isinstance(bearer_token, dict)
assert 'access_token' in bearer_token
@ -56,7 +57,7 @@ class TestAuthentication(BaseTest):
def test_get_user_info_from_public_access_token(self, app: Flask) -> None:
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
public_access_token = self.get_public_access_token(user_id, user_id)
user_info = AuthenticationService().get_user_info_from_public_access_token(public_access_token)
user_info = AuthorizationService().get_user_info_from_public_access_token(public_access_token)
assert 'sub' in user_info
assert isinstance(user_info['sub'], str)
assert len(user_info['sub']) == 36
@ -69,7 +70,7 @@ class TestAuthentication(BaseTest):
keycloak_server_url, keycloak_client_id, keycloak_realm_name, keycloak_client_secret_key = self.get_keycloak_constants(app)
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
basic_token = self.get_public_access_token(user_id, user_id)
introspection = AuthenticationService().introspect_token(basic_token)
introspection = AuthorizationService().introspect_token(basic_token)
assert isinstance(introspection, dict)
assert introspection['typ'] == 'Bearer'
assert introspection['preferred_username'] == user_id
@ -103,7 +104,7 @@ class TestAuthentication(BaseTest):
for user_id in ('user_1', 'user_2', 'admin_1', 'admin_2'):
output[user_id] = {}
basic_token = self.get_public_access_token(user_id, user_id)
permissions = AuthenticationService().get_permission_by_basic_token(basic_token)
permissions = AuthorizationService().get_permission_by_basic_token(basic_token)
if isinstance(permissions, list):
for permission in permissions:
resource_name = permission['rsname']
@ -163,7 +164,7 @@ class TestAuthentication(BaseTest):
for resource in resources:
output[user_id][resource] = {}
for scope in 'instantiate', 'read', 'update', 'delete':
auth_status = AuthenticationService().get_auth_status_for_resource_and_scope_by_token(
auth_status = AuthorizationService().get_auth_status_for_resource_and_scope_by_token(
basic_token, resource, scope
)
output[user_id][resource][scope] = auth_status
@ -178,7 +179,7 @@ class TestAuthentication(BaseTest):
for resource in resource_names:
output[user_id][resource] = {}
for scope in 'instantiate', 'read', 'update', 'delete':
permissions = AuthenticationService().\
permissions = AuthorizationService().\
get_permissions_by_token_for_resource_and_scope(basic_token, resource, scope)
output[user_id][resource][scope] = permissions
print("test_get_permissions_by_token_for_resource_and_scope")