add SPIFFWORKFLOW_BACKEND_OPEN_ID_IS_AUTHORITY_FOR_USER_GROUPS and default to false

This commit is contained in:
burnettk 2023-09-08 12:54:32 -04:00
parent 948c633b2c
commit 4e47eadfea
3 changed files with 63 additions and 55 deletions

View File

@ -88,6 +88,10 @@ SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS = environ.get(
"SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS" "SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS"
) )
SPIFFWORKFLOW_BACKEND_OPEN_ID_IS_AUTHORITY_FOR_USER_GROUPS = (
environ.get("SPIFFWORKFLOW_BACKEND_OPEN_ID_IS_AUTHORITY_FOR_USER_GROUPS", default="false") == "true"
)
SPIFFWORKFLOW_BACKEND_AUTHENTICATION_DISABLED = ( SPIFFWORKFLOW_BACKEND_AUTHENTICATION_DISABLED = (
environ.get("SPIFFWORKFLOW_BACKEND_AUTHENTICATION_DISABLED", default="false") == "true" environ.get("SPIFFWORKFLOW_BACKEND_AUTHENTICATION_DISABLED", default="false") == "true"
) )

View File

@ -435,8 +435,10 @@ class AuthorizationService:
user_attributes["service_id"] = user_info["sub"] user_attributes["service_id"] = user_info["sub"]
desired_group_identifiers = None desired_group_identifiers = None
if "groups" in user_info:
desired_group_identifiers = user_info["groups"] if current_app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_IS_AUTHORITY_FOR_USER_GROUPS"]:
if "groups" in user_info:
desired_group_identifiers = user_info["groups"]
for field_index, tenant_specific_field in enumerate( for field_index, tenant_specific_field in enumerate(
current_app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS"] current_app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_TENANT_SPECIFIC_FIELDS"]

View File

@ -29,59 +29,61 @@ class TestAuthentication(BaseTest):
client: FlaskClient, client: FlaskClient,
with_db_and_bpmn_file_cleanup: None, with_db_and_bpmn_file_cleanup: None,
) -> None: ) -> None:
user = self.find_or_create_user("testing@e.com") with self.app_config_mock(app, "SPIFFWORKFLOW_BACKEND_OPEN_ID_IS_AUTHORITY_FOR_USER_GROUPS", True):
user.email = "testing@e.com" user = self.find_or_create_user("testing@e.com")
user.service = app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"] user.email = "testing@e.com"
db.session.add(user) user.service = app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"]
db.session.commit() db.session.add(user)
db.session.commit()
access_token = user.encode_auth_token( access_token = user.encode_auth_token(
{ {
"groups": ["group_one", "group_two"], "groups": ["group_one", "group_two"],
"iss": app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"], "iss": app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"],
"aud": "spiffworkflow-backend", "aud": "spiffworkflow-backend",
"iat": round(time.time()), "iat": round(time.time()),
"exp": round(time.time()) + 1000, "exp": round(time.time()) + 1000,
} }
) )
response = client.post( response = None
f"/v1.0/login_with_access_token?access_token={access_token}", response = client.post(
) f"/v1.0/login_with_access_token?access_token={access_token}",
assert response.status_code == 200 )
assert len(user.groups) == 3 assert response.status_code == 200
group_identifiers = [g.identifier for g in user.groups] assert len(user.groups) == 3
assert sorted(group_identifiers) == ["everybody", "group_one", "group_two"] group_identifiers = [g.identifier for g in user.groups]
assert sorted(group_identifiers) == ["everybody", "group_one", "group_two"]
access_token = user.encode_auth_token( access_token = user.encode_auth_token(
{ {
"groups": ["group_one"], "groups": ["group_one"],
"iss": app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"], "iss": app.config["SPIFFWORKFLOW_BACKEND_OPEN_ID_SERVER_URL"],
"aud": "spiffworkflow-backend", "aud": "spiffworkflow-backend",
"iat": round(time.time()), "iat": round(time.time()),
"exp": round(time.time()) + 1000, "exp": round(time.time()) + 1000,
} }
) )
response = client.post( response = client.post(
f"/v1.0/login_with_access_token?access_token={access_token}", f"/v1.0/login_with_access_token?access_token={access_token}",
) )
assert response.status_code == 200 assert response.status_code == 200
user = UserModel.query.filter_by(username=user.username).first() user = UserModel.query.filter_by(username=user.username).first()
assert len(user.groups) == 2 assert len(user.groups) == 2
group_identifiers = [g.identifier for g in user.groups] group_identifiers = [g.identifier for g in user.groups]
assert sorted(group_identifiers) == ["everybody", "group_one"] assert sorted(group_identifiers) == ["everybody", "group_one"]
# make sure running refresh_permissions doesn't remove the user from the group # make sure running refresh_permissions doesn't remove the user from the group
group_info: list[GroupPermissionsDict] = [ group_info: list[GroupPermissionsDict] = [
{ {
"users": [], "users": [],
"name": "group_one", "name": "group_one",
"permissions": [{"actions": ["create", "read"], "uri": "PG:hey"}], "permissions": [{"actions": ["create", "read"], "uri": "PG:hey"}],
} }
] ]
AuthorizationService.refresh_permissions(group_info, group_permissions_only=True) AuthorizationService.refresh_permissions(group_info, group_permissions_only=True)
user = UserModel.query.filter_by(username=user.username).first() user = UserModel.query.filter_by(username=user.username).first()
assert len(user.groups) == 2 assert len(user.groups) == 2
group_identifiers = [g.identifier for g in user.groups] group_identifiers = [g.identifier for g in user.groups]
assert sorted(group_identifiers) == ["everybody", "group_one"] assert sorted(group_identifiers) == ["everybody", "group_one"]
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey") self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey")
self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo") self.assert_user_has_permission(user, "read", "/v1.0/process-groups/hey:yo")