one has_permission test passes. w/ mike

This commit is contained in:
burnettk 2022-10-07 17:10:22 -04:00
parent f65e342add
commit 5f708fe41d
6 changed files with 51 additions and 21 deletions

View File

@ -1,5 +1,3 @@
from __future__ import with_statement
import logging
from logging.config import fileConfig

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 399879a03735
Revision ID: 34b445e106af
Revises:
Create Date: 2022-10-07 16:39:31.688247
Create Date: 2022-10-07 17:08:44.808209
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '399879a03735'
revision = '34b445e106af'
down_revision = None
branch_labels = None
depends_on = None
@ -230,7 +230,8 @@ def upgrade():
sa.Column('permission', sa.Enum('create', 'read', 'update', 'delete', 'list', 'instantiate', name='permission'), nullable=True),
sa.ForeignKeyConstraint(['permission_target_id'], ['permission_target.id'], ),
sa.ForeignKeyConstraint(['principal_id'], ['principal.id'], ),
sa.PrimaryKeyConstraint('id')
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
)
op.create_table('secret_allowed_process',
sa.Column('id', sa.Integer(), nullable=False),

View File

@ -38,6 +38,14 @@ class PermissionAssignmentModel(SpiffworkflowBaseDBModel):
"""PermissionAssignmentModel."""
__tablename__ = "permission_assignment"
__table_args__ = (
db.UniqueConstraint(
"principal_id",
"permission_target_id",
"permission",
name="permission_assignment_uniq",
),
)
id = db.Column(db.Integer, primary_key=True)
principal_id = db.Column(ForeignKey(PrincipalModel.id), nullable=False)
permission_target_id = db.Column(

View File

@ -99,7 +99,7 @@ class PublicAuthenticationService:
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = get_open_id_args()
) = PublicAuthenticationService.get_open_id_args()
request_url = (
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/logout?"
+ f"post_logout_redirect_uri={return_redirect_url}&"
@ -121,7 +121,7 @@ class PublicAuthenticationService:
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = get_open_id_args()
) = PublicAuthenticationService.get_open_id_args()
return_redirect_url = f"{self.get_backend_url()}/v1.0/login_return"
login_redirect_url = (
f"{open_id_server_url}/realms/{open_id_realm_name}/protocol/openid-connect/auth?"
@ -140,7 +140,7 @@ class PublicAuthenticationService:
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = get_open_id_args()
) = PublicAuthenticationService.get_open_id_args()
backend_basic_auth_string = f"{open_id_client_id}:{open_id_client_secret_key}"
backend_basic_auth_bytes = bytes(backend_basic_auth_string, encoding="ascii")
@ -161,8 +161,8 @@ class PublicAuthenticationService:
id_token_object: dict = json.loads(response.text)
return id_token_object
@staticmethod
def validate_id_token(id_token: str) -> bool:
@classmethod
def validate_id_token(cls, id_token: str) -> bool:
"""Https://openid.net/specs/openid-connect-core-1_0.html#IDTokenValidation."""
valid = True
now = time.time()
@ -171,7 +171,7 @@ class PublicAuthenticationService:
open_id_client_id,
open_id_realm_name,
open_id_client_secret_key,
) = get_open_id_args()
) = cls.get_open_id_args()
try:
decoded_token = jwt.decode(id_token, options={"verify_signature": False})
except Exception as e:

View File

@ -1,14 +1,13 @@
"""Authorization_service."""
import base64
import json
from typing import Union
import jwt
import requests
from flask import current_app
from flask_bpmn.api.api_error import ApiError
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.models.principal import PrincipalModel
class AuthorizationService:
@ -16,12 +15,28 @@ class AuthorizationService:
@staticmethod
def has_permission(
principal: 'PrincipalModel', permission: str, target_uri: str
principal: PrincipalModel, permission: str, target_uri: str
) -> bool:
"""Has_permission."""
PermissionAssignmentModel.query.filter_by(principal_id=principal.id).all()
return True
permission_assignments = (
PermissionAssignmentModel.query.filter_by(
principal_id=principal.id, permission=permission
)
.join(PermissionTargetModel)
.filter_by(uri=target_uri)
.all()
)
if len(permission_assignments) > 1:
raise Exception(
"Multiple permission assignments found for query. That should not be possible."
)
for permission_assignment in permission_assignments:
if permission_assignment.grant_type.value == "permit":
return True
elif permission_assignment.grant_type.value == "deny":
return False
return False
# def refresh_token(self, token: str) -> str:
# """Refresh_token."""

View File

@ -1,12 +1,12 @@
"""Test Permissions."""
from flask.app import Flask
from flask_bpmn.models.db import db
from spiffworkflow_backend.services.authorization_service import AuthorizationService
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
from spiffworkflow_backend.models.permission_assignment import PermissionAssignmentModel
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
from spiffworkflow_backend.services.authorization_service import AuthorizationService
# we think we can get the list of roles for a user.
@ -72,7 +72,15 @@ class TestPermissions(BaseTest):
db.session.add(permission_assignment)
db.session.commit()
has_permission_to_a = AuthorizationService.has_permission(principal=principal, permission="update", target_uri=f"/{process_group_a_id}")
has_permission_to_a = AuthorizationService.has_permission(
principal=principal,
permission="update",
target_uri=f"/{process_group_a_id}",
)
assert has_permission_to_a is True
has_permission_to_b = AuthorizationService.has_permission(principal=principal, permission="update", target_uri=f"/{process_group_b_id}")
has_permission_to_b = AuthorizationService.has_permission(
principal=principal,
permission="update",
target_uri=f"/{process_group_b_id}",
)
assert has_permission_to_b is False