pre-commit with poetry
This commit is contained in:
parent
811d841afa
commit
8480ecb8fb
|
@ -1,6 +1,5 @@
|
|||
"""Authentication_service."""
|
||||
from keycloak import KeycloakOpenID
|
||||
from keycloak import KeycloakAdmin
|
||||
|
||||
|
||||
class AuthenticationService:
|
||||
|
@ -8,56 +7,72 @@ class AuthenticationService:
|
|||
|
||||
@staticmethod
|
||||
def get_keycloak_openid(server_url, client_id, realm_name, client_secret_key):
|
||||
keycloak_openid = KeycloakOpenID(server_url=server_url,
|
||||
client_id=client_id,
|
||||
realm_name=realm_name,
|
||||
client_secret_key=client_secret_key)
|
||||
"""Get_keycloak_openid."""
|
||||
keycloak_openid = KeycloakOpenID(
|
||||
server_url=server_url,
|
||||
client_id=client_id,
|
||||
realm_name=realm_name,
|
||||
client_secret_key=client_secret_key,
|
||||
)
|
||||
return keycloak_openid
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_keycloak_token(keycloak_openid, user, password):
|
||||
"""Get_keycloak_token."""
|
||||
token = keycloak_openid.token(user, password)
|
||||
return token
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_permission_by_token(keycloak_openid, token):
|
||||
"""Get_permission_by_token."""
|
||||
# Get permissions by token
|
||||
# KEYCLOAK_PUBLIC_KEY = keycloak_openid.public_key()
|
||||
# KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----"
|
||||
# policies = keycloak_openid.get_policies(token['access_token'], method_token_info='decode',
|
||||
# key=KEYCLOAK_PUBLIC_KEY)
|
||||
permissions = keycloak_openid.get_permissions(token['access_token'], method_token_info='introspect')
|
||||
permissions = keycloak_openid.get_permissions(
|
||||
token["access_token"], method_token_info="introspect"
|
||||
)
|
||||
# TODO: Not sure if this is good. Permissions comes back as None
|
||||
return permissions
|
||||
|
||||
@staticmethod
|
||||
def get_uma_permissions_by_token(keycloak_openid, token):
|
||||
permissions = keycloak_openid.uma_permissions(token['access_token'])
|
||||
"""Get_uma_permissions_by_token."""
|
||||
permissions = keycloak_openid.uma_permissions(token["access_token"])
|
||||
return permissions
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_uma_permissions_by_token_for_resource_and_scope(keycloak_openid, token, resource, scope):
|
||||
permissions = keycloak_openid.uma_permissions(token['access_token'], permissions=f"{resource}#{scope}")
|
||||
def get_uma_permissions_by_token_for_resource_and_scope(
|
||||
keycloak_openid, token, resource, scope
|
||||
):
|
||||
"""Get_uma_permissions_by_token_for_resource_and_scope."""
|
||||
permissions = keycloak_openid.uma_permissions(
|
||||
token["access_token"], permissions=f"{resource}#{scope}"
|
||||
)
|
||||
return permissions
|
||||
|
||||
|
||||
@staticmethod
|
||||
def get_auth_status_for_resource_and_scope_by_token(keycloak_openid, token, resource, scope):
|
||||
auth_status = keycloak_openid.has_uma_access(token['access_token'], f"{resource}#{scope}")
|
||||
def get_auth_status_for_resource_and_scope_by_token(
|
||||
keycloak_openid, token, resource, scope
|
||||
):
|
||||
"""Get_auth_status_for_resource_and_scope_by_token."""
|
||||
auth_status = keycloak_openid.has_uma_access(
|
||||
token["access_token"], f"{resource}#{scope}"
|
||||
)
|
||||
return auth_status
|
||||
|
||||
|
||||
# TODO: Get this to work
|
||||
@staticmethod
|
||||
def get_keycloak_admin():
|
||||
keycloak_admin = KeycloakAdmin(server_url="http://localhost:8080/auth/",
|
||||
username='admin',
|
||||
password='admin',
|
||||
realm_name="stackoverflow-demo",
|
||||
# user_realm_name="",
|
||||
# client_secret_key="seciKpRanUReL0ksZaFm5nfjhMUKHVAO",
|
||||
verify=True)
|
||||
return keycloak_admin
|
||||
# @staticmethod
|
||||
# def get_keycloak_admin():
|
||||
# """Get_keycloak_admin."""
|
||||
# # TODO: Get this to work
|
||||
# keycloak_admin = KeycloakAdmin(
|
||||
# server_url="http://localhost:8080/auth/",
|
||||
# username="admin",
|
||||
# password="admin",
|
||||
# realm_name="stackoverflow-demo",
|
||||
# # user_realm_name="",
|
||||
# # client_secret_key="seciKpRanUReL0ksZaFm5nfjhMUKHVAO",
|
||||
# verify=True,
|
||||
# )
|
||||
# return keycloak_admin
|
||||
|
|
|
@ -8,9 +8,7 @@
|
|||
"ownerManagedAccess": false,
|
||||
"attributes": {},
|
||||
"_id": "0f0c6dcf-9b86-419d-8331-ce6dd1f779a1",
|
||||
"uris": [
|
||||
"/*"
|
||||
]
|
||||
"uris": ["/*"]
|
||||
},
|
||||
{
|
||||
"name": "View Account Resource",
|
||||
|
@ -18,9 +16,7 @@
|
|||
"displayName": "View Account Resource",
|
||||
"attributes": {},
|
||||
"_id": "6934ad55-cd6a-46d9-8653-7b1966973917",
|
||||
"uris": [
|
||||
"account/{id}"
|
||||
],
|
||||
"uris": ["account/{id}"],
|
||||
"scopes": [
|
||||
{
|
||||
"name": "account:view"
|
||||
|
@ -103,4 +99,4 @@
|
|||
}
|
||||
],
|
||||
"decisionStrategy": "UNANIMOUS"
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,4 @@
|
|||
"""Test_authentication."""
|
||||
from flask.testing import FlaskClient
|
||||
|
||||
from keycloak.authorization import Authorization
|
||||
from keycloak.keycloak_openid import KeycloakOpenID
|
||||
from keycloak.uma_permissions import AuthStatus
|
||||
|
@ -20,6 +18,7 @@ scope = "account:view"
|
|||
|
||||
|
||||
def test_get_keycloak_openid_client():
|
||||
"""Test_get_keycloak_openid_client."""
|
||||
keycloak_openid_client = AuthenticationService.get_keycloak_openid(
|
||||
server_url, client_id, realm_name, client_secret_key
|
||||
)
|
||||
|
@ -28,68 +27,79 @@ def test_get_keycloak_openid_client():
|
|||
|
||||
|
||||
def test_get_keycloak_token():
|
||||
"""Test_get_keycloak_token."""
|
||||
keycloak_openid = AuthenticationService.get_keycloak_openid(
|
||||
server_url, client_id, realm_name, client_secret_key
|
||||
)
|
||||
token = keycloak_openid.token(user, password)
|
||||
assert isinstance(token, dict)
|
||||
assert isinstance(token['access_token'], str)
|
||||
assert isinstance(token['refresh_token'], str)
|
||||
assert token['expires_in'] == 300
|
||||
assert token['refresh_expires_in'] == 1800
|
||||
assert token['token_type'] == 'Bearer'
|
||||
assert isinstance(token["access_token"], str)
|
||||
assert isinstance(token["refresh_token"], str)
|
||||
assert token["expires_in"] == 300
|
||||
assert token["refresh_expires_in"] == 1800
|
||||
assert token["token_type"] == "Bearer"
|
||||
|
||||
|
||||
def test_get_permission_by_token():
|
||||
"""Test_get_permission_by_token."""
|
||||
keycloak_openid = AuthenticationService.get_keycloak_openid(
|
||||
server_url, client_id, realm_name, client_secret_key
|
||||
)
|
||||
keycloak_openid.load_authorization_config("tests/spiffworkflow_backend/integration/bank-api-authz-config.json")
|
||||
keycloak_openid.load_authorization_config(
|
||||
"tests/spiffworkflow_backend/integration/bank-api-authz-config.json"
|
||||
)
|
||||
token = keycloak_openid.token(user, password)
|
||||
|
||||
permissions = AuthenticationService.get_permission_by_token(keycloak_openid, token)
|
||||
AuthenticationService.get_permission_by_token(keycloak_openid, token)
|
||||
# TODO: permissions comes back as None. Is this right?
|
||||
print("test_get_permission_by_token")
|
||||
|
||||
|
||||
def test_get_uma_permissions_by_token():
|
||||
"""Test_get_uma_permissions_by_token."""
|
||||
keycloak_openid = AuthenticationService.get_keycloak_openid(
|
||||
server_url, client_id, realm_name, client_secret_key
|
||||
)
|
||||
token = keycloak_openid.token(user, password)
|
||||
uma_permissions = AuthenticationService.get_uma_permissions_by_token(keycloak_openid, token)
|
||||
uma_permissions = AuthenticationService.get_uma_permissions_by_token(
|
||||
keycloak_openid, token
|
||||
)
|
||||
assert isinstance(uma_permissions, list)
|
||||
assert len(uma_permissions) == 2
|
||||
for permission in uma_permissions:
|
||||
assert 'rsname' in permission
|
||||
if permission['rsname'] == "View Account Resource":
|
||||
assert 'scopes' in permission
|
||||
assert isinstance(permission['scopes'], list)
|
||||
assert len(permission['scopes']) == 1
|
||||
assert permission['scopes'][0] == "account:view"
|
||||
assert "rsname" in permission
|
||||
if permission["rsname"] == "View Account Resource":
|
||||
assert "scopes" in permission
|
||||
assert isinstance(permission["scopes"], list)
|
||||
assert len(permission["scopes"]) == 1
|
||||
assert permission["scopes"][0] == "account:view"
|
||||
|
||||
|
||||
def test_get_uma_permissions_by_token_for_resource_and_scope():
|
||||
"""Test_get_uma_permissions_by_token_for_resource_and_scope."""
|
||||
keycloak_openid = AuthenticationService.get_keycloak_openid(
|
||||
server_url, client_id, realm_name, client_secret_key
|
||||
)
|
||||
token = keycloak_openid.token(user, password)
|
||||
permissions = AuthenticationService.get_uma_permissions_by_token_for_resource_and_scope(
|
||||
keycloak_openid, token, resource, scope
|
||||
permissions = (
|
||||
AuthenticationService.get_uma_permissions_by_token_for_resource_and_scope(
|
||||
keycloak_openid, token, resource, scope
|
||||
)
|
||||
)
|
||||
assert isinstance(permissions, list)
|
||||
assert len(permissions) == 1
|
||||
assert isinstance(permissions[0], dict)
|
||||
permission = permissions[0]
|
||||
assert 'rsname' in permission
|
||||
assert permission['rsname'] == resource
|
||||
assert 'scopes' in permission
|
||||
assert isinstance(permission['scopes'], list)
|
||||
assert len(permission['scopes']) == 1
|
||||
assert permission['scopes'][0] == scope
|
||||
assert "rsname" in permission
|
||||
assert permission["rsname"] == resource
|
||||
assert "scopes" in permission
|
||||
assert isinstance(permission["scopes"], list)
|
||||
assert len(permission["scopes"]) == 1
|
||||
assert permission["scopes"][0] == scope
|
||||
|
||||
|
||||
def test_get_auth_status_for_resource_and_scope_by_token():
|
||||
"""Test_get_auth_status_for_resource_and_scope_by_token."""
|
||||
keycloak_openid = AuthenticationService.get_keycloak_openid(
|
||||
server_url, client_id, realm_name, client_secret_key
|
||||
)
|
||||
|
|
Loading…
Reference in New Issue