Feature/group mapping from keycloak (#457)
* some basics to set a user groups based on info in keycloak w/ burnettk * test for adding groups from token now passes * do not remove users from groups when running refresh_permissions if specified --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
bc4e475809
commit
c056b89006
|
@ -75,17 +75,6 @@ class UserModel(SpiffworkflowBaseDBModel):
|
||||||
algorithm="HS256",
|
algorithm="HS256",
|
||||||
)
|
)
|
||||||
|
|
||||||
# @classmethod
|
|
||||||
# def from_open_id_user_info(cls, user_info: dict) -> Any:
|
|
||||||
# """From_open_id_user_info."""
|
|
||||||
# instance = cls()
|
|
||||||
# instance.service = "keycloak"
|
|
||||||
# instance.service_id = user_info["sub"]
|
|
||||||
# instance.name = user_info["preferred_username"]
|
|
||||||
# instance.username = user_info["sub"]
|
|
||||||
#
|
|
||||||
# return instance
|
|
||||||
|
|
||||||
def as_dict(self) -> dict[str, Any]:
|
def as_dict(self) -> dict[str, Any]:
|
||||||
# dump the user using our json encoder and then load it back up as a dict
|
# dump the user using our json encoder and then load it back up as a dict
|
||||||
# to remove unwanted field types
|
# to remove unwanted field types
|
||||||
|
|
|
@ -7,6 +7,10 @@ from spiffworkflow_backend.models.group import GroupModel
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
|
|
||||||
|
|
||||||
|
class UserGroupAssignmentNotFoundError(Exception):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class UserGroupAssignmentModel(SpiffworkflowBaseDBModel):
|
class UserGroupAssignmentModel(SpiffworkflowBaseDBModel):
|
||||||
__tablename__ = "user_group_assignment"
|
__tablename__ = "user_group_assignment"
|
||||||
__table_args__ = (db.UniqueConstraint("user_id", "group_id", name="user_group_assignment_unique"),)
|
__table_args__ = (db.UniqueConstraint("user_id", "group_id", name="user_group_assignment_unique"),)
|
||||||
|
|
|
@ -8,6 +8,10 @@ from spiffworkflow_backend.services.authorization_service import AuthorizationSe
|
||||||
class RefreshPermissions(Script):
|
class RefreshPermissions(Script):
|
||||||
def get_description(self) -> str:
|
def get_description(self) -> str:
|
||||||
return """Add permissions using a dict.
|
return """Add permissions using a dict.
|
||||||
|
If group_permissions_only is True then it will ignore adding and removing users from groups.
|
||||||
|
This is useful if the openid server is handling assigning users to groups.
|
||||||
|
|
||||||
|
Example payload:
|
||||||
group_info: [
|
group_info: [
|
||||||
{
|
{
|
||||||
'name': group_identifier,
|
'name': group_identifier,
|
||||||
|
@ -29,4 +33,4 @@ class RefreshPermissions(Script):
|
||||||
**kwargs: Any,
|
**kwargs: Any,
|
||||||
) -> Any:
|
) -> Any:
|
||||||
group_info = args[0]
|
group_info = args[0]
|
||||||
AuthorizationService.refresh_permissions(group_info)
|
AuthorizationService.refresh_permissions(group_info, **kwargs)
|
||||||
|
|
|
@ -306,11 +306,6 @@ class AuthorizationService:
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# TODO: we can add the before_request to the blueprint
|
|
||||||
# directly when we switch over from connexion routes
|
|
||||||
# to blueprint routes
|
|
||||||
# @process_api_blueprint.before_request
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def check_for_permission(cls) -> None:
|
def check_for_permission(cls) -> None:
|
||||||
if cls.should_disable_auth_for_request():
|
if cls.should_disable_auth_for_request():
|
||||||
|
@ -439,6 +434,10 @@ class AuthorizationService:
|
||||||
user_attributes["service"] = user_info["iss"]
|
user_attributes["service"] = user_info["iss"]
|
||||||
user_attributes["service_id"] = user_info["sub"]
|
user_attributes["service_id"] = user_info["sub"]
|
||||||
|
|
||||||
|
desired_group_identifiers = None
|
||||||
|
if "groups" in user_info:
|
||||||
|
desired_group_identifiers = user_info["groups"]
|
||||||
|
|
||||||
for field_index, tenant_specific_field in enumerate(
|
for field_index, tenant_specific_field in enumerate(
|
||||||
current_app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS"]
|
current_app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS"]
|
||||||
):
|
):
|
||||||
|
@ -452,7 +451,6 @@ class AuthorizationService:
|
||||||
.filter(UserModel.username == user_attributes["username"])
|
.filter(UserModel.username == user_attributes["username"])
|
||||||
.first()
|
.first()
|
||||||
)
|
)
|
||||||
|
|
||||||
if user_model is None:
|
if user_model is None:
|
||||||
current_app.logger.debug("create_user in login_return")
|
current_app.logger.debug("create_user in login_return")
|
||||||
is_new_user = True
|
is_new_user = True
|
||||||
|
@ -465,11 +463,27 @@ class AuthorizationService:
|
||||||
if current_value != value:
|
if current_value != value:
|
||||||
user_db_model_changed = True
|
user_db_model_changed = True
|
||||||
setattr(user_model, key, value)
|
setattr(user_model, key, value)
|
||||||
|
|
||||||
if user_db_model_changed:
|
if user_db_model_changed:
|
||||||
db.session.add(user_model)
|
db.session.add(user_model)
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
|
if desired_group_identifiers is not None:
|
||||||
|
if not isinstance(desired_group_identifiers, list):
|
||||||
|
current_app.logger.error(
|
||||||
|
f"Invalid groups property in token: {desired_group_identifiers}."
|
||||||
|
"If groups is specified, it must be a list"
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
for desired_group_identifier in desired_group_identifiers:
|
||||||
|
GroupService.add_user_to_group(user_model, desired_group_identifier)
|
||||||
|
current_group_identifiers = [g.identifier for g in user_model.groups]
|
||||||
|
groups_to_remove_from_user = [
|
||||||
|
item for item in current_group_identifiers if item not in desired_group_identifiers
|
||||||
|
]
|
||||||
|
for gtrfu in groups_to_remove_from_user:
|
||||||
|
if gtrfu != current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"]:
|
||||||
|
GroupService.remove_user_from_group(user_model, gtrfu)
|
||||||
|
|
||||||
# this may eventually get too slow.
|
# this may eventually get too slow.
|
||||||
# when it does, be careful about backgrounding, because
|
# when it does, be careful about backgrounding, because
|
||||||
# the user will immediately need permissions to use the site.
|
# the user will immediately need permissions to use the site.
|
||||||
|
@ -772,7 +786,10 @@ class AuthorizationService:
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_permissions_from_group_permissions(
|
def add_permissions_from_group_permissions(
|
||||||
cls, group_permissions: list[GroupPermissionsDict], user_model: UserModel | None = None
|
cls,
|
||||||
|
group_permissions: list[GroupPermissionsDict],
|
||||||
|
user_model: UserModel | None = None,
|
||||||
|
group_permissions_only: bool = False,
|
||||||
) -> AddedPermissionDict:
|
) -> AddedPermissionDict:
|
||||||
unique_user_group_identifiers: set[str] = set()
|
unique_user_group_identifiers: set[str] = set()
|
||||||
user_to_group_identifiers: list[UserToGroupDict] = []
|
user_to_group_identifiers: list[UserToGroupDict] = []
|
||||||
|
@ -787,16 +804,17 @@ class AuthorizationService:
|
||||||
for group in group_permissions:
|
for group in group_permissions:
|
||||||
group_identifier = group["name"]
|
group_identifier = group["name"]
|
||||||
GroupService.find_or_create_group(group_identifier)
|
GroupService.find_or_create_group(group_identifier)
|
||||||
for username in group["users"]:
|
if not group_permissions_only:
|
||||||
if user_model and username != user_model.username:
|
for username in group["users"]:
|
||||||
continue
|
if user_model and username != user_model.username:
|
||||||
user_to_group_dict: UserToGroupDict = {
|
continue
|
||||||
"username": username,
|
user_to_group_dict: UserToGroupDict = {
|
||||||
"group_identifier": group_identifier,
|
"username": username,
|
||||||
}
|
"group_identifier": group_identifier,
|
||||||
user_to_group_identifiers.append(user_to_group_dict)
|
}
|
||||||
GroupService.add_user_to_group_or_add_to_waiting(username, group_identifier)
|
user_to_group_identifiers.append(user_to_group_dict)
|
||||||
unique_user_group_identifiers.add(group_identifier)
|
GroupService.add_user_to_group_or_add_to_waiting(username, group_identifier)
|
||||||
|
unique_user_group_identifiers.add(group_identifier)
|
||||||
for group in group_permissions:
|
for group in group_permissions:
|
||||||
group_identifier = group["name"]
|
group_identifier = group["name"]
|
||||||
if user_model and group_identifier not in unique_user_group_identifiers:
|
if user_model and group_identifier not in unique_user_group_identifiers:
|
||||||
|
@ -812,7 +830,7 @@ class AuthorizationService:
|
||||||
)
|
)
|
||||||
unique_user_group_identifiers.add(group_identifier)
|
unique_user_group_identifiers.add(group_identifier)
|
||||||
|
|
||||||
if default_group is not None:
|
if not group_permissions_only and default_group is not None:
|
||||||
if user_model:
|
if user_model:
|
||||||
cls.associate_user_with_group(user_model, default_group)
|
cls.associate_user_with_group(user_model, default_group)
|
||||||
else:
|
else:
|
||||||
|
@ -831,6 +849,7 @@ class AuthorizationService:
|
||||||
added_permissions: AddedPermissionDict,
|
added_permissions: AddedPermissionDict,
|
||||||
initial_permission_assignments: list[PermissionAssignmentModel],
|
initial_permission_assignments: list[PermissionAssignmentModel],
|
||||||
initial_user_to_group_assignments: list[UserGroupAssignmentModel],
|
initial_user_to_group_assignments: list[UserGroupAssignmentModel],
|
||||||
|
group_permissions_only: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
added_permission_assignments = added_permissions["permission_assignments"]
|
added_permission_assignments = added_permissions["permission_assignments"]
|
||||||
added_group_identifiers = added_permissions["group_identifiers"]
|
added_group_identifiers = added_permissions["group_identifiers"]
|
||||||
|
@ -840,18 +859,19 @@ class AuthorizationService:
|
||||||
if ipa not in added_permission_assignments:
|
if ipa not in added_permission_assignments:
|
||||||
db.session.delete(ipa)
|
db.session.delete(ipa)
|
||||||
|
|
||||||
for iutga in initial_user_to_group_assignments:
|
if not group_permissions_only:
|
||||||
# do not remove users from the default user group
|
for iutga in initial_user_to_group_assignments:
|
||||||
if (
|
# do not remove users from the default user group
|
||||||
current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] is None
|
if (
|
||||||
or current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] != iutga.group.identifier
|
current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] is None
|
||||||
):
|
or current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"] != iutga.group.identifier
|
||||||
current_user_dict: UserToGroupDict = {
|
):
|
||||||
"username": iutga.user.username,
|
current_user_dict: UserToGroupDict = {
|
||||||
"group_identifier": iutga.group.identifier,
|
"username": iutga.user.username,
|
||||||
}
|
"group_identifier": iutga.group.identifier,
|
||||||
if current_user_dict not in added_user_to_group_identifiers:
|
}
|
||||||
db.session.delete(iutga)
|
if current_user_dict not in added_user_to_group_identifiers:
|
||||||
|
db.session.delete(iutga)
|
||||||
|
|
||||||
# do not remove the default user group
|
# do not remove the default user group
|
||||||
added_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"])
|
added_group_identifiers.add(current_app.config["SPIFFWORKFLOW_BACKEND_DEFAULT_USER_GROUP"])
|
||||||
|
@ -861,12 +881,19 @@ class AuthorizationService:
|
||||||
db.session.commit()
|
db.session.commit()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def refresh_permissions(cls, group_permissions: list[GroupPermissionsDict]) -> None:
|
def refresh_permissions(
|
||||||
|
cls, group_permissions: list[GroupPermissionsDict], group_permissions_only: bool = False
|
||||||
|
) -> None:
|
||||||
"""Adds new permission assignments and deletes old ones."""
|
"""Adds new permission assignments and deletes old ones."""
|
||||||
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
initial_permission_assignments = PermissionAssignmentModel.query.all()
|
||||||
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
|
initial_user_to_group_assignments = UserGroupAssignmentModel.query.all()
|
||||||
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
|
group_permissions = group_permissions + cls.parse_permissions_yaml_into_group_info()
|
||||||
added_permissions = cls.add_permissions_from_group_permissions(group_permissions)
|
added_permissions = cls.add_permissions_from_group_permissions(
|
||||||
cls.remove_old_permissions_from_added_permissions(
|
group_permissions, group_permissions_only=group_permissions_only
|
||||||
added_permissions, initial_permission_assignments, initial_user_to_group_assignments
|
)
|
||||||
|
cls.remove_old_permissions_from_added_permissions(
|
||||||
|
added_permissions,
|
||||||
|
initial_permission_assignments,
|
||||||
|
initial_user_to_group_assignments,
|
||||||
|
group_permissions_only=group_permissions_only,
|
||||||
)
|
)
|
||||||
|
|
|
@ -3,7 +3,10 @@ from spiffworkflow_backend.models.group import SPIFF_GUEST_GROUP
|
||||||
from spiffworkflow_backend.models.group import GroupModel
|
from spiffworkflow_backend.models.group import GroupModel
|
||||||
from spiffworkflow_backend.models.user import SPIFF_GUEST_USER
|
from spiffworkflow_backend.models.user import SPIFF_GUEST_USER
|
||||||
from spiffworkflow_backend.models.user import UserModel
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
|
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
|
||||||
|
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentNotFoundError
|
||||||
from spiffworkflow_backend.services.user_service import UserService
|
from spiffworkflow_backend.services.user_service import UserService
|
||||||
|
from sqlalchemy import and_
|
||||||
|
|
||||||
|
|
||||||
class GroupService:
|
class GroupService:
|
||||||
|
@ -18,7 +21,7 @@ class GroupService:
|
||||||
return group
|
return group
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def add_user_to_group_or_add_to_waiting(cls, username: str, group_identifier: str) -> None:
|
def add_user_to_group_or_add_to_waiting(cls, username: str | UserModel, group_identifier: str) -> None:
|
||||||
group = cls.find_or_create_group(group_identifier)
|
group = cls.find_or_create_group(group_identifier)
|
||||||
user = UserModel.query.filter_by(username=username).first()
|
user = UserModel.query.filter_by(username=username).first()
|
||||||
if user:
|
if user:
|
||||||
|
@ -26,6 +29,26 @@ class GroupService:
|
||||||
else:
|
else:
|
||||||
UserService.add_waiting_group_assignment(username, group)
|
UserService.add_waiting_group_assignment(username, group)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def add_user_to_group(cls, user: UserModel, group_identifier: str) -> None:
|
||||||
|
group = cls.find_or_create_group(group_identifier)
|
||||||
|
UserService.add_user_to_group(user, group)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def remove_user_from_group(cls, user: UserModel, group_identifier: str) -> None:
|
||||||
|
user_group_assignment = (
|
||||||
|
UserGroupAssignmentModel.query.filter_by(user_id=user.id)
|
||||||
|
.join(
|
||||||
|
GroupModel,
|
||||||
|
and_(GroupModel.id == UserGroupAssignmentModel.group_id, GroupModel.identifier == group_identifier),
|
||||||
|
)
|
||||||
|
.first()
|
||||||
|
)
|
||||||
|
if user_group_assignment is None:
|
||||||
|
raise (UserGroupAssignmentNotFoundError(f"User ({user.username}) is not in group ({group_identifier})"))
|
||||||
|
db.session.delete(user_group_assignment)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def find_or_create_guest_user(
|
def find_or_create_guest_user(
|
||||||
cls, username: str = SPIFF_GUEST_USER, group_identifier: str = SPIFF_GUEST_GROUP
|
cls, username: str = SPIFF_GUEST_USER, group_identifier: str = SPIFF_GUEST_GROUP
|
||||||
|
|
|
@ -55,8 +55,10 @@ class BaseTest:
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def logged_in_headers(user: UserModel, _redirect_url: str = "http://some/frontend/url") -> dict[str, str]:
|
def logged_in_headers(
|
||||||
return {"Authorization": "Bearer " + user.encode_auth_token()}
|
user: UserModel, _redirect_url: str = "http://some/frontend/url", extra_token_payload: dict | None = None
|
||||||
|
) -> dict[str, str]:
|
||||||
|
return {"Authorization": "Bearer " + user.encode_auth_token(extra_token_payload)}
|
||||||
|
|
||||||
def create_group_and_model_with_bpmn(
|
def create_group_and_model_with_bpmn(
|
||||||
self,
|
self,
|
||||||
|
|
|
@ -1,7 +1,14 @@
|
||||||
import ast
|
import ast
|
||||||
import base64
|
import base64
|
||||||
|
import time
|
||||||
|
|
||||||
|
from flask.app import Flask
|
||||||
|
from flask.testing import FlaskClient
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
|
from spiffworkflow_backend.models.user import UserModel
|
||||||
from spiffworkflow_backend.services.authentication_service import AuthenticationService
|
from spiffworkflow_backend.services.authentication_service import AuthenticationService
|
||||||
|
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||||
|
from spiffworkflow_backend.services.authorization_service import GroupPermissionsDict
|
||||||
|
|
||||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
|
||||||
|
@ -16,153 +23,65 @@ class TestAuthentication(BaseTest):
|
||||||
assert "redirect_url" in state_dict.keys()
|
assert "redirect_url" in state_dict.keys()
|
||||||
assert state_dict["redirect_url"] == redirect_url
|
assert state_dict["redirect_url"] == redirect_url
|
||||||
|
|
||||||
# def test_get_login_redirect_url(self):
|
def test_properly_adds_user_to_groups_from_token_on_login(
|
||||||
# redirect_url = "http://example.com/"
|
self,
|
||||||
# state = AuthenticationService.generate_state(redirect_url)
|
app: Flask,
|
||||||
# with current_app.app_context():
|
client: FlaskClient,
|
||||||
# login_redirect_url = AuthenticationService().get_login_redirect_url(state.decode("UTF-8"))
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
# print("test_get_login_redirect_url")
|
) -> None:
|
||||||
# print("test_get_login_redirect_url")
|
user = self.find_or_create_user("testing@e.com")
|
||||||
|
user.email = "testing@e.com"
|
||||||
|
user.service = app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"]
|
||||||
|
db.session.add(user)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
# def test_get_token_script(self, app: Flask) -> None:
|
access_token = user.encode_auth_token(
|
||||||
# """Test_get_token_script."""
|
{
|
||||||
# print("Test Get Token Script")
|
"groups": ["group_one", "group_two"],
|
||||||
#
|
"iss": app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"],
|
||||||
# (
|
"aud": "spiffworkflow-backend",
|
||||||
# keycloak_server_url,
|
"iat": round(time.time()),
|
||||||
# keycloak_client_id,
|
"exp": round(time.time()) + 1000,
|
||||||
# keycloak_realm_name,
|
}
|
||||||
# keycloak_client_secret_key,
|
)
|
||||||
# ) = self.get_keycloak_constants(app)
|
response = client.post(
|
||||||
# keycloak_user = "ciuser1"
|
f"/v1.0/login_with_access_token?access_token={access_token}",
|
||||||
# keycloak_pass = "ciuser1" # noqa: S105
|
)
|
||||||
#
|
assert response.status_code == 200
|
||||||
# print(f"Test Get Token Script: keycloak_server_url: {keycloak_server_url}")
|
assert len(user.groups) == 3
|
||||||
# print(f"Test Get Token Script: keycloak_client_id: {keycloak_client_id}")
|
group_identifiers = [g.identifier for g in user.groups]
|
||||||
# print(f"Test Get Token Script: keycloak_realm_name: {keycloak_realm_name}")
|
assert sorted(group_identifiers) == ["everybody", "group_one", "group_two"]
|
||||||
# print(
|
|
||||||
# f"Test Get Token Script: keycloak_client_secret_key: {keycloak_client_secret_key}"
|
access_token = user.encode_auth_token(
|
||||||
# )
|
{
|
||||||
#
|
"groups": ["group_one"],
|
||||||
# frontend_client_id = "spiffworkflow-frontend"
|
"iss": app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"],
|
||||||
#
|
"aud": "spiffworkflow-backend",
|
||||||
# print(f"Test Get Token Script: frontend_client_id: {frontend_client_id}")
|
"iat": round(time.time()),
|
||||||
#
|
"exp": round(time.time()) + 1000,
|
||||||
# # Get frontend token
|
}
|
||||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
)
|
||||||
# headers = {"Content-Type": "application/x-www-form-urlencoded"}
|
response = client.post(
|
||||||
# post_data = {
|
f"/v1.0/login_with_access_token?access_token={access_token}",
|
||||||
# "grant_type": "password",
|
)
|
||||||
# "username": keycloak_user,
|
assert response.status_code == 200
|
||||||
# "password": keycloak_pass,
|
user = UserModel.query.filter_by(username=user.username).first()
|
||||||
# "client_id": frontend_client_id,
|
assert len(user.groups) == 2
|
||||||
# }
|
group_identifiers = [g.identifier for g in user.groups]
|
||||||
# print(f"Test Get Token Script: request_url: {request_url}")
|
assert sorted(group_identifiers) == ["everybody", "group_one"]
|
||||||
# print(f"Test Get Token Script: headers: {headers}")
|
|
||||||
# print(f"Test Get Token Script: post_data: {post_data}")
|
# make sure running refresh_permissions doesn't remove the user from the group
|
||||||
#
|
group_info: list[GroupPermissionsDict] = [
|
||||||
# frontend_response = requests.post(
|
{
|
||||||
# request_url, headers=headers, json=post_data, data=post_data
|
"users": [],
|
||||||
# )
|
"name": "group_one",
|
||||||
# frontend_token = json.loads(frontend_response.text)
|
"permissions": [{"actions": ["create", "read"], "uri": "PG:hey"}],
|
||||||
#
|
}
|
||||||
# print(f"Test Get Token Script: frontend_response: {frontend_response}")
|
]
|
||||||
# print(f"Test Get Token Script: frontend_token: {frontend_token}")
|
AuthorizationService.refresh_permissions(group_info, group_permissions_only=True)
|
||||||
#
|
user = UserModel.query.filter_by(username=user.username).first()
|
||||||
# # assert isinstance(frontend_token, dict)
|
assert len(user.groups) == 2
|
||||||
# # assert isinstance(frontend_token["access_token"], str)
|
group_identifiers = [g.identifier for g in user.groups]
|
||||||
# # assert isinstance(frontend_token["refresh_token"], str)
|
assert sorted(group_identifiers) == ["everybody", "group_one"]
|
||||||
# # assert frontend_token["expires_in"] == 300
|
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
|
||||||
# # assert frontend_token["refresh_expires_in"] == 1800
|
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")
|
||||||
# # assert frontend_token["token_type"] == "Bearer"
|
|
||||||
#
|
|
||||||
# # Get backend token
|
|
||||||
# backend_basic_auth_string = f"{keycloak_client_id}:{keycloak_client_secret_key}"
|
|
||||||
# backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
|
|
||||||
# backend_basic_auth = base64.b64encode(backend_basic_auth_bytes)
|
|
||||||
#
|
|
||||||
# request_url = f"{keycloak_server_url}/realms/{keycloak_realm_name}/protocol/openid-connect/token"
|
|
||||||
# headers = {
|
|
||||||
# "Content-Type": "application/x-www-form-urlencoded",
|
|
||||||
# "Authorization": f"Basic {backend_basic_auth.decode('utf-8')}",
|
|
||||||
# }
|
|
||||||
# data = {
|
|
||||||
# "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
|
|
||||||
# "client_id": keycloak_client_id,
|
|
||||||
# "subject_token": frontend_token["access_token"],
|
|
||||||
# "audience": keycloak_client_id,
|
|
||||||
# }
|
|
||||||
# print(f"Test Get Token Script: request_url: {request_url}")
|
|
||||||
# print(f"Test Get Token Script: headers: {headers}")
|
|
||||||
# print(f"Test Get Token Script: data: {data}")
|
|
||||||
#
|
|
||||||
# backend_response = requests.post(request_url, headers=headers, data=data)
|
|
||||||
# json_data = json.loads(backend_response.text)
|
|
||||||
# backend_token = json_data["access_token"]
|
|
||||||
# print(f"Test Get Token Script: backend_response: {backend_response}")
|
|
||||||
# print(f"Test Get Token Script: backend_token: {backend_token}")
|
|
||||||
#
|
|
||||||
# if backend_token:
|
|
||||||
# # Getting resource set
|
|
||||||
# auth_bearer_string = f"Bearer {backend_token}"
|
|
||||||
# headers = {
|
|
||||||
# "Content-Type": "application/json",
|
|
||||||
# "Authorization": auth_bearer_string,
|
|
||||||
# }
|
|
||||||
#
|
|
||||||
# # uri_to_test_against = "%2Fprocess-models"
|
|
||||||
# uri_to_test_against = "/status"
|
|
||||||
# request_url = (
|
|
||||||
# f"{keycloak_server_url}/realms/{keycloak_realm_name}/authz/protection/resource_set?"
|
|
||||||
# + f"matchingUri=true&deep=true&max=-1&exactName=false&uri={uri_to_test_against}"
|
|
||||||
# )
|
|
||||||
# # f"uri={uri_to_test_against}"
|
|
||||||
# print(f"Test Get Token Script: request_url: {request_url}")
|
|
||||||
# print(f"Test Get Token Script: headers: {headers}")
|
|
||||||
#
|
|
||||||
# resource_result = requests.get(request_url, headers=headers)
|
|
||||||
# print(f"Test Get Token Script: resource_result: {resource_result}")
|
|
||||||
#
|
|
||||||
# json_data = json.loads(resource_result.text)
|
|
||||||
# resource_id_name_pairs = []
|
|
||||||
# for result in json_data:
|
|
||||||
# if "_id" in result and result["_id"]:
|
|
||||||
# pair_key = result["_id"]
|
|
||||||
# if "name" in result and result["name"]:
|
|
||||||
# pair_value = result["name"]
|
|
||||||
# # pair = {{result['_id']}: {}}
|
|
||||||
# else:
|
|
||||||
# pair_value = "no_name"
|
|
||||||
# # pair = {{result['_id']}: }
|
|
||||||
# pair = [pair_key, pair_value]
|
|
||||||
# resource_id_name_pairs.append(pair)
|
|
||||||
# print(
|
|
||||||
# f"Test Get Token Script: resource_id_name_pairs: {resource_id_name_pairs}"
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# # Getting Permissions
|
|
||||||
# for resource_id_name_pair in resource_id_name_pairs:
|
|
||||||
# resource_id = resource_id_name_pair[0]
|
|
||||||
# resource_id_name_pair[1]
|
|
||||||
#
|
|
||||||
# headers = {
|
|
||||||
# "Content-Type": "application/x-www-form-urlencoded",
|
|
||||||
# "Authorization": f"Basic {backend_basic_auth.decode('utf-8')}",
|
|
||||||
# }
|
|
||||||
#
|
|
||||||
# post_data = {
|
|
||||||
# "audience": keycloak_client_id,
|
|
||||||
# "permission": resource_id,
|
|
||||||
# "subject_token": backend_token,
|
|
||||||
# "grant_type": "urn:ietf:params:oauth:grant-type:uma-ticket",
|
|
||||||
# }
|
|
||||||
# print(f"Test Get Token Script: headers: {headers}")
|
|
||||||
# print(f"Test Get Token Script: post_data: {post_data}")
|
|
||||||
# print(f"Test Get Token Script: request_url: {request_url}")
|
|
||||||
#
|
|
||||||
# permission_result = requests.post(
|
|
||||||
# request_url, headers=headers, data=post_data
|
|
||||||
# )
|
|
||||||
# print(f"Test Get Token Script: permission_result: {permission_result}")
|
|
||||||
#
|
|
||||||
# print("test_get_token_script")
|
|
||||||
|
|
Loading…
Reference in New Issue