started adding a test for update permission usage

This commit is contained in:
jasquat 2022-10-11 09:44:32 -04:00
parent a074b2dc0c
commit e9442cf27a
16 changed files with 126 additions and 51 deletions

View File

@ -7,7 +7,7 @@ repos:
language: system
types: [python]
require_serial: true
exclude: ^migrations/
exclude: "(^migrations/|^load_database_models.)"
- id: check-added-large-files
name: Check for added large files
entry: check-added-large-files

View File

@ -24,13 +24,19 @@ fi
docker compose --profile "$SPIFFWORKFLOW_BACKEND_DOCKER_COMPOSE_PROFILE" build
docker compose --profile "$SPIFFWORKFLOW_BACKEND_DOCKER_COMPOSE_PROFILE" stop
# i observed a case locally where the db had a stale sqlalchemy revision which
# caused the backend to exit and when docker compose up was running with
# --wait, it just said waiting forever (like we have seen in CI). so removing
# the volume would work around that case, if the volumes are not cleaned up in
# CI. also removing the wait prevents it from hanging forever in the case where
# the backend crashes, so then we'll just wait for the timeout to happen in the
# bin/wait_for_server_to_be_up script.
docker volume rm spiffworkflow-backend_spiffworkflow_backend || echo 'docker volume not found'
if [[ "${SPIFFWORKFLOW_BACKEND_RECREATE_DATABASE:-}" == "true" ]]; then
docker stop db
docker rm db
docker volume rm spiffworkflow-backend_spiffworkflow_backend
# i observed a case locally where the db had a stale sqlalchemy revision which
# caused the backend to exit and when docker compose up was running with
# --wait, it just said waiting forever (like we have seen in CI). so removing
# the volume would work around that case, if the volumes are not cleaned up in
# CI. also removing the wait prevents it from hanging forever in the case where
# the backend crashes, so then we'll just wait for the timeout to happen in the
# bin/wait_for_server_to_be_up script.
docker volume rm spiffworkflow-backend_spiffworkflow_backend || echo 'docker volume not found'
fi
docker compose --profile "$SPIFFWORKFLOW_BACKEND_DOCKER_COMPOSE_PROFILE" up --wait $additional_args

View File

@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
@ -34,6 +35,7 @@ from spiffworkflow_backend import create_app # noqa: E402
def app() -> Flask:
"""App."""
os.environ["SPIFFWORKFLOW_BACKEND_ENV"] = "testing"
# os.environ["FLASK_SESSION_SECRET_KEY"] = "this_is_testing_secret_key"
os.environ["FLASK_SESSION_SECRET_KEY"] = "super_secret_key"
app = create_app()
@ -54,7 +56,10 @@ def app() -> Flask:
@pytest.fixture()
def with_db_and_bpmn_file_cleanup() -> None:
"""Process_group_resource."""
# db.session.query(UserModel).delete()
for model in SpiffworkflowBaseDBModel._all_subclasses():
print(f"model: {model}")
db.session.query(model).delete()
try:

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 34b445e106af
Revision ID: 88e30afd19ac
Revises:
Create Date: 2022-10-07 17:08:44.808209
Create Date: 2022-10-11 09:39:40.882490
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '34b445e106af'
revision = '88e30afd19ac'
down_revision = None
branch_labels = None
depends_on = None
@ -35,7 +35,7 @@ def upgrade():
op.create_table('group',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=255), nullable=True),
sa.Column('new_name_two', sa.String(length=255), nullable=True),
sa.Column('identifier', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_table('message_model',

View File

@ -111,6 +111,4 @@ def create_app() -> flask.app.Flask:
if app.config["PROCESS_WAITING_MESSAGES"]:
start_scheduler(app)
print(f"app.debug: {app.debug}")
return app # type: ignore

View File

@ -47,11 +47,9 @@ def setup_config(app: Flask) -> None:
)
app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False
app.config.from_object("spiffworkflow_backend.config.default")
# This allows config/testing.py or instance/config.py to override the default config
if (
"SPIFFWORKFLOW_BACKEND_ENV" in app.config
and app.config["SPIFFWORKFLOW_BACKEND_ENV"] == "testing"
):
if "ENV_IDENTIFIER" in app.config and app.config["ENV_IDENTIFIER"] == "testing":
app.config.from_pyfile("config/testing.py", silent=True)
else:
app.config.from_pyfile(f"{app.instance_path}/config.py", silent=True)

View File

@ -4,6 +4,12 @@ autoflake8 will remove these lines without the noqa comment
"""
from flask_bpmn.models.db import add_listeners
# must load this before UserModel and GroupModel for relationships
from spiffworkflow_backend.models.user_group_assignment import (
UserGroupAssignmentModel,
) # noqa: F401
from spiffworkflow_backend.models.active_task import ActiveTaskModel # noqa: F401
from spiffworkflow_backend.models.bpmn_process_id_lookup import (
BpmnProcessIdLookup,
@ -40,8 +46,6 @@ from spiffworkflow_backend.models.secret_model import SecretModel # noqa: F401
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel # noqa: F401
from spiffworkflow_backend.models.task_event import TaskEventModel # noqa: F401
from spiffworkflow_backend.models.user import UserModel # noqa: F401
from spiffworkflow_backend.models.user_group_assignment import (
UserGroupAssignmentModel,
) # noqa: F401
from spiffworkflow_backend.models.group import GroupModel # noqa: F401
add_listeners()

View File

@ -19,7 +19,9 @@ class GroupModel(FlaskBpmnGroupModel):
__tablename__ = "group"
__table_args__ = {"extend_existing": True}
new_name_two = db.Column(db.String(255))
identifier = db.Column(db.String(255))
user_group_assignments = relationship("UserGroupAssignmentModel", cascade="delete")
users = relationship( # type: ignore
"UserModel",

View File

@ -12,17 +12,19 @@ from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
from spiffworkflow_backend.services.authentication_service import (
AuthenticationProviderTypes,
)
# from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
class UserModel(SpiffworkflowBaseDBModel):
"""UserModel."""
__tablename__ = "user"
__table_args__ = (db.UniqueConstraint("service", "service_id", name="service_key"),)
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(255), nullable=False, unique=True)
uid = db.Column(db.String(50), unique=True)
@ -30,7 +32,8 @@ class UserModel(SpiffworkflowBaseDBModel):
service_id = db.Column(db.String(255), nullable=False, unique=False)
name = db.Column(db.String(255))
email = db.Column(db.String(255))
user_group_assignments = relationship(UserGroupAssignmentModel, cascade="delete")
user_group_assignments = relationship("UserGroupAssignmentModel", cascade="delete")
groups = relationship( # type: ignore
GroupModel,
viewonly=True,

View File

@ -4,6 +4,9 @@ from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.user import UserModel
class UserGroupAssignmentModel(SpiffworkflowBaseDBModel):
"""UserGroupAssignmentModel."""
@ -12,8 +15,10 @@ class UserGroupAssignmentModel(SpiffworkflowBaseDBModel):
__table_args__ = (
db.UniqueConstraint("user_id", "group_id", name="user_group_assignment_unique"),
)
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(ForeignKey("user.id"), nullable=False)
group_id = db.Column(ForeignKey("group.id"), nullable=False)
user_id = db.Column(ForeignKey(UserModel.id), nullable=False)
group_id = db.Column(ForeignKey(GroupModel.id), nullable=False)
group = relationship("GroupModel", overlaps="groups,user_group_assignments,users") # type: ignore
user = relationship("UserModel", overlaps="groups,user_group_assignments,users") # type: ignore

View File

@ -142,7 +142,7 @@ def process_model_save(process_model_id: str, file_name: str) -> Union[str, Resp
@admin_blueprint.route("/process-models/<process_model_id>/run", methods=["GET"])
def process_model_run(process_model_id: str) -> Union[str, Response]:
"""Process_model_run."""
user = UserService().create_user("internal", "Mr. Test", username="Mr. Test")
user = UserService.create_user("internal", "Mr. Test", username="Mr. Test")
process_instance = ProcessInstanceService.create_process_instance(
process_model_id, user
)

View File

@ -237,7 +237,7 @@ def login_return(code: str, state: str, session_state: str) -> Optional[Response
username = user_info["preferred_username"]
if "email" in user_info:
email = user_info["email"]
user_model = UserService().create_user(
user_model = UserService.create_user(
service="open_id",
service_id=user_info["sub"],
name=name,

View File

@ -21,23 +21,25 @@ class AuthorizationService:
) -> bool:
"""Has_permission."""
principal_ids = [p.id for p in principals]
permission_assignment = (
permission_assignments = (
PermissionAssignmentModel.query.filter(
PermissionAssignmentModel.principal_id.in_(principal_ids)
)
.filter_by(permission=permission)
.join(PermissionTargetModel)
.filter_by(uri=target_uri)
.first()
.all()
)
if permission_assignment is None:
return False
if permission_assignment.grant_type.value == "permit":
return True
elif permission_assignment.grant_type.value == "deny":
return False
else:
raise Exception("Unknown grant type")
for permission_assignment in permission_assignments:
if permission_assignment.grant_type.value == "permit":
return True
elif permission_assignment.grant_type.value == "deny":
return False
else:
raise Exception("Unknown grant type")
return False
@classmethod
def user_has_permission(

View File

@ -7,16 +7,19 @@ from flask import g
from flask_bpmn.api.api_error import ApiError
from flask_bpmn.models.db import db
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.principal import PrincipalModel
from spiffworkflow_backend.models.user import AdminSessionModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignmentModel
class UserService:
"""Provides common tools for working with users."""
@classmethod
def create_user(
self,
cls,
service: str,
service_id: str,
name: Optional[str] = "",
@ -49,7 +52,7 @@ class UserService:
raise ApiError(
code="add_user_error", message=f"Could not add user {username}"
) from e
self.create_principal(user_model.id)
cls.create_principal(user_model.id)
return user_model
else:
@ -64,8 +67,9 @@ class UserService:
)
)
@classmethod
def find_or_create_user(
self,
cls,
service: str,
service_id: str,
name: Optional[str] = None,
@ -75,7 +79,7 @@ class UserService:
"""Find_or_create_user."""
user_model: UserModel
try:
user_model = self.create_user(
user_model = cls.create_user(
service=service,
service_id=service_id,
name=name,
@ -264,7 +268,8 @@ class UserService:
message=f"No principal was found for user_id: {user_id}",
)
def create_principal(self, user_id: int) -> PrincipalModel:
@classmethod
def create_principal(cls, user_id: int) -> PrincipalModel:
"""Create_principal."""
principal: Optional[PrincipalModel] = PrincipalModel.query.filter_by(
user_id=user_id
@ -282,3 +287,10 @@ class UserService:
message=f"Could not create principal {user_id}",
) from e
return principal
@classmethod
def add_user_to_group(cls, user: UserModel, group: GroupModel) -> None:
"""Add_user_to_group."""
ugam = UserGroupAssignmentModel(user_id=user.id, group_id=group.id)
db.session.add(ugam)
db.session.commit()

View File

@ -38,7 +38,7 @@ class BaseTest:
if isinstance(user, UserModel):
return user
user = UserService().create_user("internal", username, username=username)
user = UserService.create_user("internal", username, username=username)
if isinstance(user, UserModel):
return user

View File

@ -4,9 +4,11 @@ from flask_bpmn.models.db import db
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from spiffworkflow_backend.services.user_service import UserService
# we think we can get the list of roles for a user.
@ -57,7 +59,6 @@ class TestPermissions(BaseTest):
process_group_id=process_group_id,
)
group_a_admin = self.find_or_create_user()
principal = group_a_admin.principal
permission_target = PermissionTargetModel(uri=f"/{process_group_a_id}")
db.session.add(permission_target)
@ -65,22 +66,61 @@ class TestPermissions(BaseTest):
permission_assignment = PermissionAssignmentModel(
permission_target_id=permission_target.id,
principal_id=principal.id,
principal_id=group_a_admin.principal.id,
permission="update",
grant_type="permit",
)
db.session.add(permission_assignment)
db.session.commit()
has_permission_to_a = AuthorizationService.has_permission(
principals=[principal],
has_permission_to_a = AuthorizationService.user_has_permission(
user=group_a_admin,
permission="update",
target_uri=f"/{process_group_a_id}",
)
assert has_permission_to_a is True
has_permission_to_b = AuthorizationService.has_permission(
principals=[principal],
has_permission_to_b = AuthorizationService.user_has_permission(
user=group_a_admin,
permission="update",
target_uri=f"/{process_group_b_id}",
)
assert has_permission_to_b is False
def test_user_can_be_granted_access_through_a_group(
self, app: Flask, with_db_and_bpmn_file_cleanup: None
) -> None:
"""Test_group_a_admin_needs_to_stay_away_from_group_b."""
process_group_ids = ["group-a", "group-b"]
process_group_a_id = process_group_ids[0]
process_group_ids[1]
for process_group_id in process_group_ids:
load_test_spec(
"timers_intermediate_catch_event",
process_group_id=process_group_id,
)
user = self.find_or_create_user()
group = GroupModel(identifier="groupA")
db.session.add(group)
db.session.commit()
UserService.add_user_to_group(user, group)
permission_target = PermissionTargetModel(uri=f"/{process_group_a_id}")
db.session.add(permission_target)
db.session.commit()
permission_assignment = PermissionAssignmentModel(
permission_target_id=permission_target.id,
principal_id=group.principal.id,
permission="update",
grant_type="permit",
)
db.session.add(permission_assignment)
db.session.commit()
has_permission_to_a = AuthorizationService.user_has_permission(
user=user,
permission="update",
target_uri=f"/{process_group_a_id}",
)
assert has_permission_to_a is True