mirror of
https://github.com/sartography/spiffworkflow-backend.git
synced 2025-02-23 04:48:09 +00:00
Merge branch 'main' into bug/refresh-token
# Conflicts: # migrations/versions/cf862b761896_.py # migrations/versions/f1f17d99d118_.py # migrations/versions/fe0828c30b20_.py
This commit is contained in:
commit
259f74a1ee
14
bin/delete_and_import_all_permissions.py
Normal file
14
bin/delete_and_import_all_permissions.py
Normal file
@ -0,0 +1,14 @@
|
||||
"""Deletes all permissions and then re-imports from yaml file."""
|
||||
from spiffworkflow_backend import get_hacked_up_app_for_script
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
|
||||
|
||||
def main() -> None:
|
||||
"""Main."""
|
||||
app = get_hacked_up_app_for_script()
|
||||
with app.app_context():
|
||||
AuthorizationService.delete_all_permissions_and_recreate()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -630,6 +630,28 @@
|
||||
"notBefore": 0,
|
||||
"groups": []
|
||||
},
|
||||
{
|
||||
"id": "9b46f3be-a81d-4b76-92e6-2ac8462f5ec8",
|
||||
"createdTimestamp": 1665688255982,
|
||||
"username": "finance_user1",
|
||||
"enabled": true,
|
||||
"totp": false,
|
||||
"emailVerified": false,
|
||||
"credentials": [
|
||||
{
|
||||
"id": "f14722ec-13a7-4d35-a4ec-0475d405ae58",
|
||||
"type": "password",
|
||||
"createdDate": 1665688275943,
|
||||
"secretData": "{\"value\":\"PlNhf8ShIvaSP3CUwCwAJ2tkqcTCVmCWUy4rbuLSXxEIiuGMu4XeZdsrE82R8PWuDQhlWn/YOUOk38xKZS2ySQ==\",\"salt\":\"m7JGY2cWgFBXMYQSSP2JQQ==\",\"additionalParameters\":{}}",
|
||||
"credentialData": "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\",\"additionalParameters\":{}}"
|
||||
}
|
||||
],
|
||||
"disableableCredentialTypes": [],
|
||||
"requiredActions": [],
|
||||
"realmRoles": ["default-roles-spiffworkflow"],
|
||||
"notBefore": 0,
|
||||
"groups": []
|
||||
},
|
||||
{
|
||||
"id": "087bdc16-e362-4340-aa60-1ff71a45f844",
|
||||
"createdTimestamp": 1665516884829,
|
||||
@ -828,31 +850,6 @@
|
||||
"notBefore": 0,
|
||||
"groups": []
|
||||
},
|
||||
{
|
||||
"id": "a15da457-7ebb-49d4-9dcc-6876cb71600d",
|
||||
"createdTimestamp": 1657115919770,
|
||||
"username": "repeat_form_user_1",
|
||||
"enabled": true,
|
||||
"totp": false,
|
||||
"emailVerified": false,
|
||||
"credentials": [
|
||||
{
|
||||
"id": "509dfd8d-a54e-4d8b-b250-ec99e585e15d",
|
||||
"type": "password",
|
||||
"createdDate": 1657298008525,
|
||||
"secretData": "{\"value\":\"/47zG9XBvKg+1P2z6fRL4cyUNn+sB4BgXsxBsvi1NYR9Z20WTeWzzOT2uXvv2ajKMRHrv0OqTesldvSJXARPqA==\",\"salt\":\"dODEHOF24xGPx+7QGaIXWQ==\",\"additionalParameters\":{}}",
|
||||
"credentialData": "{\"hashIterations\":27500,\"algorithm\":\"pbkdf2-sha256\",\"additionalParameters\":{}}"
|
||||
}
|
||||
],
|
||||
"disableableCredentialTypes": [],
|
||||
"requiredActions": [],
|
||||
"realmRoles": ["default-roles-spiffworkflow"],
|
||||
"clientRoles": {
|
||||
"spiffworkflow-backend": ["uma_protection", "repeat-form-role-2"]
|
||||
},
|
||||
"notBefore": 0,
|
||||
"groups": []
|
||||
},
|
||||
{
|
||||
"id": "f3852a7d-8adf-494f-b39d-96ad4c899ee5",
|
||||
"createdTimestamp": 1665516926300,
|
||||
|
@ -10,6 +10,7 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.process_instance_processor import (
|
||||
ProcessInstanceProcessor,
|
||||
)
|
||||
@ -57,6 +58,7 @@ def with_db_and_bpmn_file_cleanup() -> None:
|
||||
"""Process_group_resource."""
|
||||
for model in SpiffworkflowBaseDBModel._all_subclasses():
|
||||
db.session.query(model).delete()
|
||||
db.session.commit()
|
||||
|
||||
try:
|
||||
yield
|
||||
@ -66,6 +68,12 @@ def with_db_and_bpmn_file_cleanup() -> None:
|
||||
shutil.rmtree(process_model_service.root_path())
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def with_super_admin_user() -> UserModel:
|
||||
"""With_super_admin_user."""
|
||||
return BaseTest.create_user_with_permission("super_admin")
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def setup_process_instances_for_reports() -> list[ProcessInstanceModel]:
|
||||
"""Setup_process_instances_for_reports."""
|
||||
|
@ -1,8 +1,8 @@
|
||||
"""empty message
|
||||
|
||||
Revision ID: f1f17d99d118
|
||||
Revision ID: e6b28d8e3178
|
||||
Revises:
|
||||
Create Date: 2022-10-20 11:52:54.758095
|
||||
Create Date: 2022-10-20 13:05:25.896486
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f1f17d99d118'
|
||||
revision = 'e6b28d8e3178'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
@ -236,8 +236,8 @@ def upgrade():
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('principal_id', sa.Integer(), nullable=False),
|
||||
sa.Column('permission_target_id', sa.Integer(), nullable=False),
|
||||
sa.Column('grant_type', sa.String(length=50), nullable=True),
|
||||
sa.Column('permission', sa.String(length=50), nullable=True),
|
||||
sa.Column('grant_type', sa.String(length=50), nullable=False),
|
||||
sa.Column('permission', sa.String(length=50), nullable=False),
|
||||
sa.ForeignKeyConstraint(['permission_target_id'], ['permission_target.id'], ),
|
||||
sa.ForeignKeyConstraint(['principal_id'], ['principal.id'], ),
|
||||
sa.PrimaryKeyConstraint('id'),
|
@ -19,7 +19,9 @@ import spiffworkflow_backend.load_database_models # noqa: F401
|
||||
from spiffworkflow_backend.config import setup_config
|
||||
from spiffworkflow_backend.routes.admin_blueprint.admin_blueprint import admin_blueprint
|
||||
from spiffworkflow_backend.routes.process_api_blueprint import process_api_blueprint
|
||||
from spiffworkflow_backend.routes.user import verify_token
|
||||
from spiffworkflow_backend.routes.user_blueprint import user_blueprint
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
from spiffworkflow_backend.services.background_processing_service import (
|
||||
BackgroundProcessingService,
|
||||
)
|
||||
@ -114,6 +116,9 @@ def create_app() -> flask.app.Flask:
|
||||
|
||||
configure_sentry(app)
|
||||
|
||||
app.before_request(verify_token)
|
||||
app.before_request(AuthorizationService.check_for_permission)
|
||||
|
||||
return app # type: ignore
|
||||
|
||||
|
||||
|
@ -1,13 +1,14 @@
|
||||
openapi: "3.0.2"
|
||||
info:
|
||||
version: 1.0.0
|
||||
title: Workflow Microservice
|
||||
title: spiffworkflow-backend
|
||||
license:
|
||||
name: MIT
|
||||
servers:
|
||||
- url: http://localhost:5000/v1.0
|
||||
security:
|
||||
- jwt: ["secret"]
|
||||
# this is handled in flask now
|
||||
security: []
|
||||
# - jwt: ["secret"]
|
||||
# - oAuth2AuthCode:
|
||||
# - read_email
|
||||
# - uid
|
||||
@ -378,7 +379,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/OkTrue"
|
||||
# process model update
|
||||
put:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_update
|
||||
summary: Modifies an existing process mosel with the given parameters.
|
||||
@ -827,7 +827,6 @@ paths:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/File"
|
||||
# process_model_file_update
|
||||
put:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.process_model_file_update
|
||||
summary: save the contents to the given file
|
||||
@ -1250,6 +1249,25 @@ paths:
|
||||
"404":
|
||||
description: Secret does not exist
|
||||
|
||||
/permissions-check:
|
||||
post:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.permissions_check
|
||||
summary: Checks if current user has access to given list of target uris and permissions.
|
||||
tags:
|
||||
- Permissions
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Secret"
|
||||
responses:
|
||||
"200":
|
||||
description: Result of permission check
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Secret"
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
jwt:
|
||||
|
@ -1,13 +1,25 @@
|
||||
groups:
|
||||
admin:
|
||||
users:
|
||||
[jakub, kb, alex, dan, mike, jason, amir, jarrad, elizabeth, jon, natalia]
|
||||
[
|
||||
jakub,
|
||||
kb,
|
||||
alex,
|
||||
dan,
|
||||
mike,
|
||||
jason,
|
||||
amir,
|
||||
jarrad,
|
||||
elizabeth,
|
||||
jon,
|
||||
harmeet,
|
||||
sasha,
|
||||
manuchehr,
|
||||
natalia,
|
||||
]
|
||||
|
||||
finance:
|
||||
users: [harmeet, sasha]
|
||||
|
||||
hr:
|
||||
users: [manuchehr]
|
||||
users: [finance_user1]
|
||||
|
||||
permissions:
|
||||
admin:
|
||||
@ -20,10 +32,10 @@ permissions:
|
||||
groups: [finance]
|
||||
users: []
|
||||
allowed_permissions: [create, read, update, delete]
|
||||
uri: /v1.0/process-groups/finance/*
|
||||
uri: /v1.0/process-groups/execute-procure-to-pay/*
|
||||
|
||||
read-all:
|
||||
groups: [finance, hr, admin]
|
||||
groups: [finance, admin]
|
||||
users: []
|
||||
allowed_permissions: [read]
|
||||
uri: /*
|
||||
|
@ -11,3 +11,7 @@ SPIFFWORKFLOW_BACKEND_LOG_TO_FILE = (
|
||||
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(
|
||||
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME", default="testing.yml"
|
||||
)
|
||||
|
||||
SPIFFWORKFLOW_BACKEND_LOG_LEVEL = environ.get(
|
||||
"SPIFFWORKFLOW_BACKEND_LOG_LEVEL", default="debug"
|
||||
)
|
||||
|
@ -29,4 +29,4 @@ class GroupModel(FlaskBpmnGroupModel):
|
||||
secondary="user_group_assignment",
|
||||
overlaps="user_group_assignments,users",
|
||||
)
|
||||
principal = relationship("PrincipalModel", uselist=False) # type: ignore
|
||||
principal = relationship("PrincipalModel", uselist=False, cascade="all, delete") # type: ignore
|
||||
|
@ -31,7 +31,13 @@ class Permission(enum.Enum):
|
||||
read = "read"
|
||||
update = "update"
|
||||
delete = "delete"
|
||||
|
||||
# maybe read to GET process_model/process-instances instead?
|
||||
list = "list"
|
||||
|
||||
# maybe use create instead on
|
||||
# POST http://localhost:7000/v1.0/process-models/category_number_one/call-activity/process-instances/*
|
||||
# POST http://localhost:7000/v1.0/process-models/category_number_one/call-activity/process-instances/332/run
|
||||
instantiate = "instantiate" # this is something you do to a process model
|
||||
|
||||
|
||||
@ -50,10 +56,10 @@ class PermissionAssignmentModel(SpiffworkflowBaseDBModel):
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
principal_id = db.Column(ForeignKey(PrincipalModel.id), nullable=False)
|
||||
permission_target_id = db.Column(
|
||||
ForeignKey(PermissionTargetModel.id), nullable=False
|
||||
ForeignKey(PermissionTargetModel.id), nullable=False # type: ignore
|
||||
)
|
||||
grant_type = db.Column(db.String(50))
|
||||
permission = db.Column(db.String(50))
|
||||
grant_type = db.Column(db.String(50), nullable=False)
|
||||
permission = db.Column(db.String(50), nullable=False)
|
||||
|
||||
@validates("grant_type")
|
||||
def validate_grant_type(self, key: str, value: str) -> Any:
|
||||
|
@ -1,5 +1,7 @@
|
||||
"""PermissionTarget."""
|
||||
import re
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
from flask_bpmn.models.db import db
|
||||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
@ -10,13 +12,23 @@ class InvalidPermissionTargetUriError(Exception):
|
||||
"""InvalidPermissionTargetUriError."""
|
||||
|
||||
|
||||
@dataclass
|
||||
class PermissionTargetModel(SpiffworkflowBaseDBModel):
|
||||
"""PermissionTargetModel."""
|
||||
|
||||
URI_ALL = "/%"
|
||||
|
||||
__tablename__ = "permission_target"
|
||||
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
uri = db.Column(db.String(255), unique=True, nullable=False)
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
uri: str = db.Column(db.String(255), unique=True, nullable=False)
|
||||
|
||||
def __init__(self, uri: str, id: Optional[int] = None):
|
||||
"""__init__."""
|
||||
if id:
|
||||
self.id = id
|
||||
uri_with_percent = re.sub(r"\*", "%", uri)
|
||||
self.uri = uri_with_percent
|
||||
|
||||
@validates("uri")
|
||||
def validate_uri(self, key: str, value: str) -> str:
|
||||
|
@ -55,6 +55,7 @@ from spiffworkflow_backend.models.secret_model import SecretModelSchema
|
||||
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.routes.user import verify_token
|
||||
from spiffworkflow_backend.services.authorization_service import AuthorizationService
|
||||
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.git_service import GitService
|
||||
@ -97,6 +98,39 @@ def status() -> flask.wrappers.Response:
|
||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def permissions_check(body: Dict[str, Dict[str, list[str]]]) -> flask.wrappers.Response:
|
||||
"""Permissions_check."""
|
||||
if "requests_to_check" not in body:
|
||||
raise (
|
||||
ApiError(
|
||||
error_code="could_not_requests_to_check",
|
||||
message="The key 'requests_to_check' not found at root of request body.",
|
||||
status_code=400,
|
||||
)
|
||||
)
|
||||
|
||||
response_dict: dict[str, dict[str, bool]] = {}
|
||||
requests_to_check = body["requests_to_check"]
|
||||
|
||||
for target_uri, http_methods in requests_to_check.items():
|
||||
if target_uri not in response_dict:
|
||||
response_dict[target_uri] = {}
|
||||
|
||||
for http_method in http_methods:
|
||||
permission_string = AuthorizationService.get_permission_from_http_method(
|
||||
http_method
|
||||
)
|
||||
if permission_string:
|
||||
has_permission = AuthorizationService.user_has_permission(
|
||||
user=g.user,
|
||||
permission=permission_string,
|
||||
target_uri=target_uri,
|
||||
)
|
||||
response_dict[target_uri][http_method] = has_permission
|
||||
|
||||
return make_response(jsonify({"results": response_dict}), 200)
|
||||
|
||||
|
||||
def process_group_add(
|
||||
body: Dict[str, Union[str, bool, int]]
|
||||
) -> flask.wrappers.Response:
|
||||
@ -794,7 +828,7 @@ def authentication_callback(
|
||||
auth_method: str,
|
||||
) -> werkzeug.wrappers.Response:
|
||||
"""Authentication_callback."""
|
||||
verify_token(request.args.get("token"))
|
||||
verify_token(request.args.get("token"), force_run=True)
|
||||
response = request.args["response"]
|
||||
SecretService().update_secret(
|
||||
f"{service}/{auth_method}", response, g.user.id, create_if_not_exists=True
|
||||
@ -847,6 +881,8 @@ def process_instance_report_show(
|
||||
return Response(json.dumps(result_dict), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
# TODO: see comment for before_request
|
||||
# @process_api_blueprint.route("/v1.0/tasks", methods=["GET"])
|
||||
def task_list_my_tasks(page: int = 1, per_page: int = 100) -> flask.wrappers.Response:
|
||||
"""Task_list_my_tasks."""
|
||||
principal = find_principal_or_raise()
|
||||
|
@ -10,6 +10,7 @@ import jwt
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
from flask import redirect
|
||||
from flask import request
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from werkzeug.wrappers import Response
|
||||
|
||||
@ -26,13 +27,17 @@ from spiffworkflow_backend.services.user_service import UserService
|
||||
"""
|
||||
|
||||
|
||||
def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, int]]]:
|
||||
# authorization_exclusion_list = ['status']
|
||||
def verify_token(
|
||||
token: Optional[str] = None, force_run: Optional[bool] = False
|
||||
) -> Optional[Dict[str, Optional[Union[str, int]]]]:
|
||||
"""Verify the token for the user (if provided).
|
||||
|
||||
If in production environment and token is not provided, gets user from the SSO headers and returns their token.
|
||||
|
||||
Args:
|
||||
token: Optional[str]
|
||||
force_run: Optional[bool]
|
||||
|
||||
Returns:
|
||||
token: str
|
||||
@ -41,6 +46,12 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
|
||||
ApiError: If not on production and token is not valid, returns an 'invalid_token' 403 error.
|
||||
If on production and user is not authenticated, returns a 'no_user' 403 error.
|
||||
"""
|
||||
if not force_run and AuthorizationService.should_disable_auth_for_request():
|
||||
return None
|
||||
|
||||
if not token and "Authorization" in request.headers:
|
||||
token = request.headers["Authorization"].removeprefix("Bearer ")
|
||||
|
||||
if token:
|
||||
user_model = None
|
||||
decoded_token = get_decoded_token(token)
|
||||
@ -132,8 +143,9 @@ def verify_token(token: Optional[str] = None) -> Dict[str, Optional[Union[str, i
|
||||
if g.user:
|
||||
# This is an id token, so we don't have a refresh token yet
|
||||
g.token = token
|
||||
scope = get_scope(token)
|
||||
return {"uid": g.user.id, "sub": g.user.id, "scope": scope}
|
||||
get_scope(token)
|
||||
return None
|
||||
# return {"uid": g.user.id, "sub": g.user.id, "scope": scope}
|
||||
# return validate_scope(token, user_info, user_model)
|
||||
else:
|
||||
raise ApiError(error_code="no_user_id", message="Cannot get a user id")
|
||||
|
@ -6,6 +6,8 @@ from typing import Union
|
||||
import jwt
|
||||
import yaml
|
||||
from flask import current_app
|
||||
from flask import g
|
||||
from flask import request
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from flask_bpmn.models.db import db
|
||||
from sqlalchemy import text
|
||||
@ -21,6 +23,10 @@ from spiffworkflow_backend.models.user_group_assignment import UserGroupAssignme
|
||||
from spiffworkflow_backend.services.user_service import UserService
|
||||
|
||||
|
||||
class PermissionsFileNotSetError(Exception):
|
||||
"""PermissionsFileNotSetError."""
|
||||
|
||||
|
||||
class AuthorizationService:
|
||||
"""Determine whether a user has permission to perform their request."""
|
||||
|
||||
@ -47,7 +53,9 @@ class AuthorizationService:
|
||||
elif permission_assignment.grant_type == "deny":
|
||||
return False
|
||||
else:
|
||||
raise Exception("Unknown grant type")
|
||||
raise Exception(
|
||||
f"Unknown grant type: {permission_assignment.grant_type}"
|
||||
)
|
||||
|
||||
return False
|
||||
|
||||
@ -72,11 +80,31 @@ class AuthorizationService:
|
||||
|
||||
return cls.has_permission(principals, permission, target_uri)
|
||||
|
||||
@classmethod
|
||||
def delete_all_permissions_and_recreate(cls) -> None:
|
||||
"""Delete_all_permissions_and_recreate."""
|
||||
for model in [PermissionAssignmentModel, PermissionTargetModel]:
|
||||
db.session.query(model).delete()
|
||||
|
||||
# cascading to principals doesn't seem to work when attempting to delete all so do it like this instead
|
||||
for group in GroupModel.query.all():
|
||||
db.session.delete(group)
|
||||
|
||||
db.session.commit()
|
||||
cls.import_permissions_from_yaml_file()
|
||||
|
||||
@classmethod
|
||||
def import_permissions_from_yaml_file(
|
||||
cls, raise_if_missing_user: bool = False
|
||||
) -> None:
|
||||
"""Import_permissions_from_yaml_file."""
|
||||
if current_app.config["SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME"] is None:
|
||||
raise (
|
||||
PermissionsFileNotSetError(
|
||||
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME needs to be set in order to import permissions"
|
||||
)
|
||||
)
|
||||
|
||||
permission_configs = None
|
||||
with open(current_app.config["PERMISSIONS_FILE_FULLPATH"]) as file:
|
||||
permission_configs = yaml.safe_load(file)
|
||||
@ -171,6 +199,88 @@ class AuthorizationService:
|
||||
db.session.commit()
|
||||
return permission_assignment
|
||||
|
||||
@classmethod
|
||||
def should_disable_auth_for_request(cls) -> bool:
|
||||
"""Should_disable_auth_for_request."""
|
||||
authentication_exclusion_list = ["status", "authentication_callback"]
|
||||
if request.method == "OPTIONS":
|
||||
return True
|
||||
|
||||
# if the endpoint does not exist then let the system 404
|
||||
#
|
||||
# for some reason this runs before connexion checks if the
|
||||
# endpoint exists.
|
||||
if not request.endpoint:
|
||||
return True
|
||||
|
||||
api_view_function = current_app.view_functions[request.endpoint]
|
||||
if (
|
||||
api_view_function
|
||||
and api_view_function.__name__.startswith("login")
|
||||
or api_view_function.__name__.startswith("logout")
|
||||
or api_view_function.__name__ in authentication_exclusion_list
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_permission_from_http_method(cls, http_method: str) -> Optional[str]:
|
||||
"""Get_permission_from_request_method."""
|
||||
request_method_mapper = {
|
||||
"POST": "create",
|
||||
"GET": "read",
|
||||
"PUT": "update",
|
||||
"DELETE": "delete",
|
||||
}
|
||||
if http_method in request_method_mapper:
|
||||
return request_method_mapper[http_method]
|
||||
|
||||
return None
|
||||
|
||||
# TODO: we can add the before_request to the blueprint
|
||||
# directly when we switch over from connexion routes
|
||||
# to blueprint routes
|
||||
# @process_api_blueprint.before_request
|
||||
|
||||
@classmethod
|
||||
def check_for_permission(cls) -> None:
|
||||
"""Check_for_permission."""
|
||||
if cls.should_disable_auth_for_request():
|
||||
return None
|
||||
|
||||
authorization_exclusion_list = ["permissions_check"]
|
||||
|
||||
if not hasattr(g, "user"):
|
||||
raise ApiError(
|
||||
error_code="user_not_logged_in",
|
||||
message="User is not logged in. Please log in",
|
||||
status_code=401,
|
||||
)
|
||||
|
||||
api_view_function = current_app.view_functions[request.endpoint]
|
||||
if (
|
||||
api_view_function
|
||||
and api_view_function.__name__ in authorization_exclusion_list
|
||||
):
|
||||
return None
|
||||
|
||||
permission_string = cls.get_permission_from_http_method(request.method)
|
||||
if permission_string:
|
||||
has_permission = AuthorizationService.user_has_permission(
|
||||
user=g.user,
|
||||
permission=permission_string,
|
||||
target_uri=request.path,
|
||||
)
|
||||
if has_permission:
|
||||
return None
|
||||
|
||||
raise ApiError(
|
||||
error_code="unauthorized",
|
||||
message="User is not authorized to perform requested action.",
|
||||
status_code=403,
|
||||
)
|
||||
|
||||
# def refresh_token(self, token: str) -> str:
|
||||
# """Refresh_token."""
|
||||
# # if isinstance(token, str):
|
||||
|
@ -889,9 +889,6 @@ class ProcessInstanceProcessor:
|
||||
self.process_bpmn_messages()
|
||||
self.queue_waiting_receive_messages()
|
||||
|
||||
if save:
|
||||
self.save()
|
||||
|
||||
except WorkflowTaskExecException as we:
|
||||
raise ApiError.from_workflow_exception("task_error", str(we), we) from we
|
||||
|
||||
|
@ -15,6 +15,8 @@ from flask_bpmn.models.db import db
|
||||
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||
from werkzeug.test import TestResponse # type: ignore
|
||||
|
||||
from spiffworkflow_backend.models.permission_assignment import Permission
|
||||
from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
from spiffworkflow_backend.models.process_group import ProcessGroup
|
||||
from spiffworkflow_backend.models.process_group import ProcessGroupSchema
|
||||
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
|
||||
@ -92,6 +94,7 @@ class BaseTest:
|
||||
exception_notification_addresses: Optional[list] = None,
|
||||
primary_process_id: Optional[str] = None,
|
||||
primary_file_name: Optional[str] = None,
|
||||
user: Optional[UserModel] = None,
|
||||
) -> TestResponse:
|
||||
"""Create_process_model."""
|
||||
process_model_service = ProcessModelService()
|
||||
@ -121,7 +124,9 @@ class BaseTest:
|
||||
fault_or_suspend_on_exception=fault_or_suspend_on_exception,
|
||||
exception_notification_addresses=exception_notification_addresses,
|
||||
)
|
||||
user = self.find_or_create_user()
|
||||
if user is None:
|
||||
user = self.find_or_create_user()
|
||||
|
||||
response = client.post(
|
||||
"/v1.0/process-models",
|
||||
content_type="application/json",
|
||||
@ -139,6 +144,7 @@ class BaseTest:
|
||||
process_model: Optional[ProcessModelInfo] = None,
|
||||
file_name: str = "random_fact.svg",
|
||||
file_data: bytes = b"abcdef",
|
||||
user: Optional[UserModel] = None,
|
||||
) -> Any:
|
||||
"""Test_create_spec_file."""
|
||||
if process_model is None:
|
||||
@ -146,7 +152,8 @@ class BaseTest:
|
||||
process_model_id, process_group_id=process_group_id
|
||||
)
|
||||
data = {"file": (io.BytesIO(file_data), file_name)}
|
||||
user = self.find_or_create_user()
|
||||
if user is None:
|
||||
user = self.find_or_create_user()
|
||||
response = client.post(
|
||||
f"/v1.0/process-models/{process_model.process_group_id}/{process_model.id}/files",
|
||||
data=data,
|
||||
@ -218,6 +225,46 @@ class BaseTest:
|
||||
db.session.commit()
|
||||
return process_instance
|
||||
|
||||
@classmethod
|
||||
def create_user_with_permission(
|
||||
cls,
|
||||
username: str,
|
||||
target_uri: str = PermissionTargetModel.URI_ALL,
|
||||
permission_names: Optional[list[str]] = None,
|
||||
) -> UserModel:
|
||||
"""Create_user_with_permission."""
|
||||
user = BaseTest.find_or_create_user(username=username)
|
||||
return cls.add_permissions_to_user(
|
||||
user, target_uri=target_uri, permission_names=permission_names
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def add_permissions_to_user(
|
||||
cls,
|
||||
user: UserModel,
|
||||
target_uri: str = PermissionTargetModel.URI_ALL,
|
||||
permission_names: Optional[list[str]] = None,
|
||||
) -> UserModel:
|
||||
"""Add_permissions_to_user."""
|
||||
permission_target = PermissionTargetModel.query.filter_by(
|
||||
uri=target_uri
|
||||
).first()
|
||||
if permission_target is None:
|
||||
permission_target = PermissionTargetModel(uri=target_uri)
|
||||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
|
||||
if permission_names is None:
|
||||
permission_names = [member.name for member in Permission]
|
||||
|
||||
for permission in permission_names:
|
||||
AuthorizationService.create_permission_for_principal(
|
||||
principal=user.principal,
|
||||
permission_target=permission_target,
|
||||
permission=permission,
|
||||
)
|
||||
return user
|
||||
|
||||
@staticmethod
|
||||
def logged_in_headers(
|
||||
user: UserModel, _redirect_url: str = "http://some/frontend/url"
|
||||
|
@ -3,18 +3,23 @@ from flask.app import Flask
|
||||
from flask.testing import FlaskClient
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
|
||||
|
||||
class TestLoggingService(BaseTest):
|
||||
"""Test logging service."""
|
||||
|
||||
def test_logging_service_spiff_logger(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_process_instance_run."""
|
||||
process_group_id = "test_logging_spiff_logger"
|
||||
process_model_id = "simple_script"
|
||||
user = self.find_or_create_user()
|
||||
headers = self.logged_in_headers(user)
|
||||
headers = self.logged_in_headers(with_super_admin_user)
|
||||
response = self.create_process_instance(
|
||||
client, process_group_id, process_model_id, headers
|
||||
)
|
||||
@ -22,13 +27,13 @@ class TestLoggingService(BaseTest):
|
||||
process_instance_id = response.json["id"]
|
||||
response = client.post(
|
||||
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/run",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=headers,
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
log_response = client.get(
|
||||
f"/v1.0/process-models/{process_group_id}/{process_model_id}/process-instances/{process_instance_id}/logs",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=headers,
|
||||
)
|
||||
assert log_response.status_code == 200
|
||||
assert log_response.json
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -48,6 +48,7 @@ class SecretServiceTestHelpers(BaseTest):
|
||||
process_model_id=self.test_process_model_id,
|
||||
process_model_display_name=self.test_process_model_display_name,
|
||||
process_model_description=self.test_process_model_description,
|
||||
user=user,
|
||||
)
|
||||
process_model_info = ProcessModelService().get_process_model(
|
||||
self.test_process_model_id, self.test_process_group_id
|
||||
@ -58,118 +59,153 @@ class SecretServiceTestHelpers(BaseTest):
|
||||
class TestSecretService(SecretServiceTestHelpers):
|
||||
"""TestSecretService."""
|
||||
|
||||
def test_add_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
def test_add_secret(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_add_secret."""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
|
||||
assert test_secret is not None
|
||||
assert test_secret.key == self.test_key
|
||||
assert test_secret.value == self.test_value
|
||||
assert test_secret.creator_user_id == user.id
|
||||
assert test_secret.creator_user_id == with_super_admin_user.id
|
||||
|
||||
def test_add_secret_duplicate_key_fails(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_add_secret_duplicate_key_fails."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
assert ae.value.error_code == "create_secret_error"
|
||||
|
||||
def test_get_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
def test_get_secret(
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_get_secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
|
||||
secret = SecretService().get_secret(self.test_key)
|
||||
assert secret is not None
|
||||
assert secret.value == self.test_value
|
||||
|
||||
def test_get_secret_bad_key_fails(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_get_secret_bad_service."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
|
||||
with pytest.raises(ApiError):
|
||||
SecretService().get_secret("bad_key")
|
||||
|
||||
def test_update_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test update secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
secret = SecretService.get_secret(self.test_key)
|
||||
assert secret
|
||||
assert secret.value == self.test_value
|
||||
SecretService.update_secret(self.test_key, "new_secret_value", user.id)
|
||||
SecretService.update_secret(
|
||||
self.test_key, "new_secret_value", with_super_admin_user.id
|
||||
)
|
||||
new_secret = SecretService.get_secret(self.test_key)
|
||||
assert new_secret
|
||||
assert new_secret.value == "new_secret_value" # noqa: S105
|
||||
|
||||
def test_update_secret_bad_user_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_update_secret_bad_user."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService.update_secret(
|
||||
self.test_key, "new_secret_value", user.id + 1
|
||||
self.test_key, "new_secret_value", with_super_admin_user.id + 1
|
||||
) # noqa: S105
|
||||
assert (
|
||||
ae.value.message
|
||||
== f"User: {user.id+1} cannot update the secret with key : test_key"
|
||||
== f"User: {with_super_admin_user.id+1} cannot update the secret with key : test_key"
|
||||
)
|
||||
|
||||
def test_update_secret_bad_secret_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_update_secret_bad_secret_fails."""
|
||||
user = self.find_or_create_user()
|
||||
secret = self.add_test_secret(user)
|
||||
secret = self.add_test_secret(with_super_admin_user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService.update_secret(secret.key + "x", "some_new_value", user.id)
|
||||
SecretService.update_secret(
|
||||
secret.key + "x", "some_new_value", with_super_admin_user.id
|
||||
)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
assert ae.value.error_code == "update_secret_error"
|
||||
|
||||
def test_delete_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test delete secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
secrets = SecretModel.query.all()
|
||||
assert len(secrets) == 1
|
||||
assert secrets[0].creator_user_id == user.id
|
||||
SecretService.delete_secret(self.test_key, user.id)
|
||||
assert secrets[0].creator_user_id == with_super_admin_user.id
|
||||
SecretService.delete_secret(self.test_key, with_super_admin_user.id)
|
||||
secrets = SecretModel.query.all()
|
||||
assert len(secrets) == 0
|
||||
|
||||
def test_delete_secret_bad_user_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_delete_secret_bad_user."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService.delete_secret(self.test_key, user.id + 1)
|
||||
SecretService.delete_secret(self.test_key, with_super_admin_user.id + 1)
|
||||
assert (
|
||||
f"User: {user.id+1} cannot delete the secret with key" in ae.value.message
|
||||
f"User: {with_super_admin_user.id+1} cannot delete the secret with key"
|
||||
in ae.value.message
|
||||
)
|
||||
|
||||
def test_delete_secret_bad_secret_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_delete_secret_bad_secret_fails."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService.delete_secret(self.test_key + "x", user.id)
|
||||
SecretService.delete_secret(self.test_key + "x", with_super_admin_user.id)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
|
||||
@ -177,19 +213,22 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
||||
"""TestSecretServiceApi."""
|
||||
|
||||
def test_add_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_add_secret."""
|
||||
user = self.find_or_create_user()
|
||||
secret_model = SecretModel(
|
||||
key=self.test_key,
|
||||
value=self.test_value,
|
||||
creator_user_id=user.id,
|
||||
creator_user_id=with_super_admin_user.id,
|
||||
)
|
||||
data = json.dumps(SecretModelSchema().dump(secret_model))
|
||||
response: TestResponse = client.post(
|
||||
"/v1.0/secrets",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=data,
|
||||
)
|
||||
@ -199,17 +238,20 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
||||
assert key in secret.keys()
|
||||
assert secret["key"] == self.test_key
|
||||
assert secret["value"] == self.test_value
|
||||
assert secret["creator_user_id"] == user.id
|
||||
assert secret["creator_user_id"] == with_super_admin_user.id
|
||||
|
||||
def test_get_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test get secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
secret_response = client.get(
|
||||
f"/v1.0/secrets/{self.test_key}",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert secret_response
|
||||
assert secret_response.status_code == 200
|
||||
@ -217,20 +259,25 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
||||
assert secret_response.json["value"] == self.test_value
|
||||
|
||||
def test_update_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_update_secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
secret: Optional[SecretModel] = SecretService.get_secret(self.test_key)
|
||||
assert secret
|
||||
assert secret.value == self.test_value
|
||||
secret_model = SecretModel(
|
||||
key=self.test_key, value="new_secret_value", creator_user_id=user.id
|
||||
key=self.test_key,
|
||||
value="new_secret_value",
|
||||
creator_user_id=with_super_admin_user.id,
|
||||
)
|
||||
response = client.put(
|
||||
f"/v1.0/secrets/{self.test_key}",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(SecretModelSchema().dump(secret_model)),
|
||||
)
|
||||
@ -242,42 +289,61 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
||||
assert secret_model.value == "new_secret_value"
|
||||
|
||||
def test_delete_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test delete secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
self.add_test_secret(with_super_admin_user)
|
||||
secret = SecretService.get_secret(self.test_key)
|
||||
assert secret
|
||||
assert secret.value == self.test_value
|
||||
secret_response = client.delete(
|
||||
f"/v1.0/secrets/{self.test_key}",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert secret_response.status_code == 200
|
||||
with pytest.raises(ApiError):
|
||||
secret = SecretService.get_secret(self.test_key)
|
||||
|
||||
def test_delete_secret_bad_user(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_delete_secret_bad_user."""
|
||||
user_1 = self.find_or_create_user()
|
||||
user_2 = self.find_or_create_user("test_user_2")
|
||||
self.add_test_secret(user_1)
|
||||
|
||||
# ensure user has permissions to delete the given secret
|
||||
self.add_permissions_to_user(
|
||||
user_2,
|
||||
target_uri=f"/v1.0/secrets/{self.test_key}",
|
||||
permission_names=["delete"],
|
||||
)
|
||||
secret_response = client.delete(
|
||||
f"/v1.0/secrets/{self.test_key}",
|
||||
headers=self.logged_in_headers(user_2),
|
||||
)
|
||||
assert secret_response.status_code == 401
|
||||
assert secret_response.json
|
||||
assert secret_response.json["error_code"] == "delete_secret_error"
|
||||
|
||||
def test_delete_secret_bad_key(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test delete secret."""
|
||||
user = self.find_or_create_user()
|
||||
secret_response = client.delete(
|
||||
"/v1.0/secrets/bad_secret_key",
|
||||
headers=self.logged_in_headers(user),
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert secret_response.status_code == 404
|
||||
|
@ -13,10 +13,10 @@ from spiffworkflow_backend.models.permission_target import PermissionTargetModel
|
||||
class TestPermissionTarget(BaseTest):
|
||||
"""TestPermissionTarget."""
|
||||
|
||||
def test_asterisk_must_go_at_the_end_of_uri(
|
||||
def test_wildcard_must_go_at_the_end_of_uri(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_asterisk_must_go_at_the_end_of_uri."""
|
||||
"""Test_wildcard_must_go_at_the_end_of_uri."""
|
||||
permission_target = PermissionTargetModel(uri="/test_group/%")
|
||||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
@ -30,3 +30,13 @@ class TestPermissionTarget(BaseTest):
|
||||
assert (
|
||||
str(exception.value) == "Wildcard must appear at end: /test_group/%/model"
|
||||
)
|
||||
|
||||
def test_can_change_asterisk_to_percent_on_creation(
|
||||
self, app: Flask, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_can_change_asterisk_to_percent_on_creation."""
|
||||
permission_target = PermissionTargetModel(uri="/test_group/*")
|
||||
db.session.add(permission_target)
|
||||
db.session.commit()
|
||||
assert isinstance(permission_target.id, int)
|
||||
assert permission_target.uri == "/test_group/%"
|
||||
|
Loading…
x
Reference in New Issue
Block a user