merged in main and resolved conflicts w/ burnettk
This commit is contained in:
commit
93c4562944
|
@ -1,3 +1,5 @@
|
|||
from __future__ import with_statement
|
||||
|
||||
import logging
|
||||
from logging.config import fileConfig
|
||||
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: 682172cae89a
|
||||
Revision ID: 520f596b4f06
|
||||
Revises:
|
||||
Create Date: 2022-10-13 13:03:12.246235
|
||||
Create Date: 2022-10-13 15:02:03.824896
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
|||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '682172cae89a'
|
||||
revision = '520f596b4f06'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
@ -233,14 +233,6 @@ def upgrade():
|
|||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
|
||||
)
|
||||
op.create_table('secret_allowed_process',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('secret_id', sa.Integer(), nullable=False),
|
||||
sa.Column('allowed_relative_path', sa.String(length=500), nullable=False),
|
||||
sa.ForeignKeyConstraint(['secret_id'], ['secret.id'], ),
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('secret_id', 'allowed_relative_path', name='unique_secret_path')
|
||||
)
|
||||
op.create_table('spiff_logging',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('process_instance_id', sa.Integer(), nullable=False),
|
||||
|
@ -313,7 +305,6 @@ def downgrade():
|
|||
op.drop_table('data_store')
|
||||
op.drop_table('task_event')
|
||||
op.drop_table('spiff_logging')
|
||||
op.drop_table('secret_allowed_process')
|
||||
op.drop_table('permission_assignment')
|
||||
op.drop_table('message_instance')
|
||||
op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation')
|
|
@ -2965,7 +2965,18 @@ py = [
|
|||
{file = "py-1.11.0.tar.gz", hash = "sha256:51c75c4126074b472f746a24399ad32f6053d1b34b68d2fa41e558e6f4a98719"},
|
||||
]
|
||||
pyasn1 = [
|
||||
{file = "pyasn1-0.4.8-py2.4.egg", hash = "sha256:fec3e9d8e36808a28efb59b489e4528c10ad0f480e57dcc32b4de5c9d8c9fdf3"},
|
||||
{file = "pyasn1-0.4.8-py2.5.egg", hash = "sha256:0458773cfe65b153891ac249bcf1b5f8f320b7c2ce462151f8fa74de8934becf"},
|
||||
{file = "pyasn1-0.4.8-py2.6.egg", hash = "sha256:5c9414dcfede6e441f7e8f81b43b34e834731003427e5b09e4e00e3172a10f00"},
|
||||
{file = "pyasn1-0.4.8-py2.7.egg", hash = "sha256:6e7545f1a61025a4e58bb336952c5061697da694db1cae97b116e9c46abcf7c8"},
|
||||
{file = "pyasn1-0.4.8-py2.py3-none-any.whl", hash = "sha256:39c7e2ec30515947ff4e87fb6f456dfc6e84857d34be479c9d4a4ba4bf46aa5d"},
|
||||
{file = "pyasn1-0.4.8-py3.1.egg", hash = "sha256:78fa6da68ed2727915c4767bb386ab32cdba863caa7dbe473eaae45f9959da86"},
|
||||
{file = "pyasn1-0.4.8-py3.2.egg", hash = "sha256:08c3c53b75eaa48d71cf8c710312316392ed40899cb34710d092e96745a358b7"},
|
||||
{file = "pyasn1-0.4.8-py3.3.egg", hash = "sha256:03840c999ba71680a131cfaee6fab142e1ed9bbd9c693e285cc6aca0d555e576"},
|
||||
{file = "pyasn1-0.4.8-py3.4.egg", hash = "sha256:7ab8a544af125fb704feadb008c99a88805126fb525280b2270bb25cc1d78a12"},
|
||||
{file = "pyasn1-0.4.8-py3.5.egg", hash = "sha256:e89bf84b5437b532b0803ba5c9a5e054d21fec423a89952a74f87fa2c9b7bce2"},
|
||||
{file = "pyasn1-0.4.8-py3.6.egg", hash = "sha256:014c0e9976956a08139dc0712ae195324a75e142284d5f87f1a87ee1b068a359"},
|
||||
{file = "pyasn1-0.4.8-py3.7.egg", hash = "sha256:99fcc3c8d804d1bc6d9a099921e39d827026409a58f2a720dcdb89374ea0c776"},
|
||||
{file = "pyasn1-0.4.8.tar.gz", hash = "sha256:aef77c9fb94a3ac588e87841208bdec464471d9871bd5050a287cc9a475cd0ba"},
|
||||
]
|
||||
pycodestyle = [
|
||||
|
|
|
@ -1193,41 +1193,6 @@ paths:
|
|||
"404":
|
||||
description: Secret does not exist
|
||||
|
||||
/secrets/allowed_process_paths:
|
||||
post:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.add_allowed_process_path
|
||||
summary: Create an allowed process to a secret
|
||||
tags:
|
||||
- Secrets
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
responses:
|
||||
"201":
|
||||
description: Allowed process created successfully
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
/secrets/allowed_process_paths/{allowed_process_path_id}:
|
||||
parameters:
|
||||
- name: allowed_process_path_id
|
||||
in: path
|
||||
required: true
|
||||
description: The id of the allowed process path to delete
|
||||
schema:
|
||||
type: integer
|
||||
delete:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.delete_allowed_process_path
|
||||
summary: Delete an existing allowed process for a secret
|
||||
tags:
|
||||
- Secrets
|
||||
responses:
|
||||
"204":
|
||||
description: The allowed process is deleted.
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
jwt:
|
||||
|
@ -1993,12 +1958,6 @@ components:
|
|||
type: number
|
||||
example: 1
|
||||
nullable: false
|
||||
allowed_processes:
|
||||
description: The processes allowed to access this secret
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
nullable: true
|
||||
ProcessInstanceLog:
|
||||
properties:
|
||||
id:
|
||||
|
@ -2030,18 +1989,3 @@ components:
|
|||
description: The timestamp returned in the log
|
||||
type: number
|
||||
example: 123456789.12345
|
||||
SecretAllowedProcessPath:
|
||||
properties:
|
||||
id:
|
||||
description: The id of the allowed process path
|
||||
type: number
|
||||
example: 1
|
||||
nullable: true
|
||||
secret_id:
|
||||
description: The id of the secret associated with this allowed process path
|
||||
type: number
|
||||
example: 2
|
||||
allowed_relative_path:
|
||||
description: The allowed process path
|
||||
type: string
|
||||
example: /group_one/group_two/model_a
|
||||
|
|
|
@ -1,4 +1,9 @@
|
|||
"""Staging."""
|
||||
from os import environ
|
||||
|
||||
GIT_COMMIT_ON_SAVE = True
|
||||
GIT_COMMIT_USERNAME = "staging"
|
||||
GIT_COMMIT_EMAIL = "staging@example.com"
|
||||
SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME = environ.get(
|
||||
"SPIFFWORKFLOW_BACKEND_PERMISSIONS_FILE_NAME", default="staging.yml"
|
||||
)
|
||||
|
|
|
@ -45,9 +45,6 @@ from spiffworkflow_backend.models.process_instance import (
|
|||
from spiffworkflow_backend.models.process_instance_report import (
|
||||
ProcessInstanceReportModel,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.secret_model import (
|
||||
SecretAllowedProcessPathModel,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel # noqa: F401
|
||||
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel # noqa: F401
|
||||
from spiffworkflow_backend.models.task_event import TaskEventModel # noqa: F401
|
||||
|
|
|
@ -5,8 +5,6 @@ from flask_bpmn.models.db import db
|
|||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
from marshmallow import Schema
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.orm import RelationshipProperty
|
||||
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
|
||||
|
@ -21,29 +19,6 @@ class SecretModel(SpiffworkflowBaseDBModel):
|
|||
value: str = db.Column(db.String(255), nullable=False)
|
||||
creator_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False)
|
||||
|
||||
allowed_processes: RelationshipProperty = relationship(
|
||||
"SecretAllowedProcessPathModel", cascade="delete"
|
||||
)
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel):
|
||||
"""Allowed processes can be Process Groups or Process Models.
|
||||
|
||||
We store the path in either case.
|
||||
"""
|
||||
|
||||
__tablename__ = "secret_allowed_process"
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint(
|
||||
"secret_id", "allowed_relative_path", name="unique_secret_path"
|
||||
),
|
||||
)
|
||||
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
secret_id: int = db.Column(ForeignKey(SecretModel.id), nullable=False) # type: ignore
|
||||
allowed_relative_path: str = db.Column(db.String(500), nullable=False)
|
||||
|
||||
|
||||
class SecretModelSchema(Schema):
|
||||
"""SecretModelSchema."""
|
||||
|
@ -52,14 +27,4 @@ class SecretModelSchema(Schema):
|
|||
"""Meta."""
|
||||
|
||||
model = SecretModel
|
||||
fields = ["key", "value", "creator_user_id", "allowed_processes"]
|
||||
|
||||
|
||||
class SecretAllowedProcessSchema(Schema):
|
||||
"""SecretAllowedProcessSchema."""
|
||||
|
||||
class Meta:
|
||||
"""Meta."""
|
||||
|
||||
model = SecretAllowedProcessPathModel
|
||||
fields = ["secret_id", "allowed_relative_path"]
|
||||
fields = ["key", "value", "creator_user_id"]
|
||||
|
|
|
@ -48,7 +48,6 @@ from spiffworkflow_backend.models.process_instance_report import (
|
|||
)
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
|
||||
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessSchema
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModelSchema
|
||||
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel
|
||||
|
@ -1330,7 +1329,9 @@ def prepare_form_data(
|
|||
|
||||
def render_jinja_template(unprocessed_template: str, data: dict[str, Any]) -> str:
|
||||
"""Render_jinja_template."""
|
||||
jinja_environment = jinja2.Environment(autoescape=True)
|
||||
jinja_environment = jinja2.Environment(
|
||||
autoescape=True, lstrip_blocks=True, trim_blocks=True
|
||||
)
|
||||
template = jinja_environment.from_string(unprocessed_template)
|
||||
return template.render(**data)
|
||||
|
||||
|
@ -1413,24 +1414,6 @@ def delete_secret(key: str) -> Response:
|
|||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def add_allowed_process_path(body: dict) -> Response:
|
||||
"""Get allowed process paths."""
|
||||
allowed_process_path = SecretService.add_allowed_process(
|
||||
body["secret_id"], g.user.id, body["allowed_relative_path"]
|
||||
)
|
||||
return Response(
|
||||
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
|
||||
status=201,
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
|
||||
def delete_allowed_process_path(allowed_process_path_id: int) -> Response:
|
||||
"""Get allowed process paths."""
|
||||
SecretService().delete_allowed_process(allowed_process_path_id, g.user.id)
|
||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def _get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -> Any:
|
||||
"""Get_required_parameter_or_raise."""
|
||||
return_value = None
|
||||
|
|
|
@ -3,9 +3,7 @@ from typing import Optional
|
|||
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from flask_bpmn.models.db import db
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessPathModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
|
||||
# from cryptography.fernet import Fernet
|
||||
|
@ -125,85 +123,3 @@ class SecretService:
|
|||
message=f"Cannot delete secret with key: {key}. Resource does not exist.",
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def add_allowed_process(
|
||||
secret_id: int, user_id: str, allowed_relative_path: str
|
||||
) -> SecretAllowedProcessPathModel:
|
||||
"""Add_allowed_process."""
|
||||
secret_model = SecretModel.query.filter(SecretModel.id == secret_id).first()
|
||||
if secret_model:
|
||||
if secret_model.creator_user_id == user_id:
|
||||
secret_process_model = SecretAllowedProcessPathModel(
|
||||
secret_id=secret_model.id,
|
||||
allowed_relative_path=allowed_relative_path,
|
||||
)
|
||||
assert secret_process_model # noqa: S101
|
||||
db.session.add(secret_process_model)
|
||||
try:
|
||||
db.session.commit()
|
||||
except IntegrityError as ie:
|
||||
db.session.rollback()
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"Error adding allowed_process with secret {secret_model.id}, "
|
||||
f"and path: {allowed_relative_path}. Resource already exists. "
|
||||
f"Original error is {ie}",
|
||||
status_code=409,
|
||||
) from ie
|
||||
except Exception as e:
|
||||
# TODO: should we call db.session.rollback() here?
|
||||
# db.session.rollback()
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"Could not create an allowed process for secret with key: {secret_model.key} "
|
||||
f"with path: {allowed_relative_path}. "
|
||||
f"Original error is {e}",
|
||||
) from e
|
||||
return secret_process_model
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"User: {user_id} cannot modify the secret with key : {secret_model.key}",
|
||||
status_code=401,
|
||||
)
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"Cannot add allowed process to secret with key: {secret_id}. Resource does not exist.",
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def delete_allowed_process(allowed_process_id: int, user_id: int) -> None:
|
||||
"""Delete_allowed_process."""
|
||||
allowed_process = SecretAllowedProcessPathModel.query.filter(
|
||||
SecretAllowedProcessPathModel.id == allowed_process_id
|
||||
).first()
|
||||
if allowed_process:
|
||||
secret = SecretModel.query.filter(
|
||||
SecretModel.id == allowed_process.secret_id
|
||||
).first()
|
||||
assert secret # noqa: S101
|
||||
if secret.creator_user_id == user_id:
|
||||
db.session.delete(allowed_process)
|
||||
try:
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
raise ApiError(
|
||||
error_code="delete_allowed_process_error",
|
||||
message=f"There was an exception deleting allowed_process: {allowed_process_id}. "
|
||||
f"Original error is: {e}",
|
||||
) from e
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="delete_allowed_process_error",
|
||||
message=f"User: {user_id} cannot delete the allowed_process with id : {allowed_process_id}",
|
||||
status_code=401,
|
||||
)
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="delete_allowed_process_error",
|
||||
message=f"Cannot delete allowed_process: {allowed_process_id}. Resource does not exist.",
|
||||
status_code=404,
|
||||
)
|
||||
|
|
|
@ -10,11 +10,9 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
|||
from werkzeug.test import TestResponse
|
||||
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessPathModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModelSchema
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.secret_service import SecretService
|
||||
|
||||
|
@ -57,23 +55,6 @@ class SecretServiceTestHelpers(BaseTest):
|
|||
)
|
||||
return process_model_info
|
||||
|
||||
def add_test_secret_allowed_process(
|
||||
self, client: FlaskClient, user: UserModel
|
||||
) -> SecretAllowedProcessPathModel:
|
||||
"""Add_test_secret_allowed_process."""
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
|
||||
test_secret = self.add_test_secret(user)
|
||||
allowed_process_model = SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
return allowed_process_model
|
||||
|
||||
|
||||
class TestSecretService(SecretServiceTestHelpers):
|
||||
"""TestSecretService."""
|
||||
|
@ -227,176 +208,6 @@ class TestSecretService(SecretServiceTestHelpers):
|
|||
SecretService.delete_secret(self.test_key + "x", with_super_admin_user.id)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
def test_secret_add_allowed_process(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process."""
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
process_model_info = self.add_test_process(client, with_super_admin_user)
|
||||
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
allowed_process_model = SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=with_super_admin_user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
|
||||
assert allowed_process_model is not None
|
||||
assert isinstance(allowed_process_model, SecretAllowedProcessPathModel)
|
||||
assert allowed_process_model.secret_id == test_secret.id
|
||||
assert (
|
||||
allowed_process_model.allowed_relative_path == process_model_relative_path
|
||||
)
|
||||
|
||||
assert len(test_secret.allowed_processes) == 1
|
||||
assert test_secret.allowed_processes[0] == allowed_process_model
|
||||
|
||||
def test_secret_add_allowed_process_same_process_fails(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Do not allow duplicate entries for secret_id/allowed_relative_path pairs.
|
||||
|
||||
We actually take care of this in the db model with a unique constraint
|
||||
on the 2 columns.
|
||||
"""
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
process_model_info = self.add_test_process(client, with_super_admin_user)
|
||||
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=with_super_admin_user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=with_super_admin_user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
assert "Resource already exists" in ae.value.message
|
||||
|
||||
def test_secret_add_allowed_process_bad_user_fails(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process_bad_user."""
|
||||
process_model_info = self.add_test_process(client, with_super_admin_user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=with_super_admin_user.id + 1,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
assert (
|
||||
ae.value.message
|
||||
== f"User: {with_super_admin_user.id+1} cannot modify the secret with key : {self.test_key}"
|
||||
)
|
||||
|
||||
def test_secret_add_allowed_process_bad_secret_fails(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process_bad_secret_fails."""
|
||||
process_model_info = self.add_test_process(client, with_super_admin_user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id + 1,
|
||||
user_id=with_super_admin_user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
def test_secret_delete_allowed_process(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_secret_delete_allowed_process."""
|
||||
allowed_process_model = self.add_test_secret_allowed_process(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
|
||||
SecretService().delete_allowed_process(
|
||||
allowed_process_model.id, with_super_admin_user.id
|
||||
)
|
||||
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 0
|
||||
|
||||
def test_secret_delete_allowed_process_bad_user_fails(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_secret_delete_allowed_process_bad_user_fails."""
|
||||
allowed_process_model = self.add_test_secret_allowed_process(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().delete_allowed_process(
|
||||
allowed_process_model.id, with_super_admin_user.id + 1
|
||||
)
|
||||
message = ae.value.message
|
||||
assert (
|
||||
f"User: {with_super_admin_user.id+1} cannot delete the allowed_process with id : {allowed_process_model.id}"
|
||||
in message
|
||||
)
|
||||
|
||||
def test_secret_delete_allowed_process_bad_allowed_process_fails(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test_secret_delete_allowed_process_bad_allowed_process_fails."""
|
||||
allowed_process_model = self.add_test_secret_allowed_process(
|
||||
client, with_super_admin_user
|
||||
)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().delete_allowed_process(
|
||||
allowed_process_model.id + 1, with_super_admin_user.id
|
||||
)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
|
||||
class TestSecretServiceApi(SecretServiceTestHelpers):
|
||||
"""TestSecretServiceApi."""
|
||||
|
@ -536,60 +347,3 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
|||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert secret_response.status_code == 404
|
||||
|
||||
def test_add_secret_allowed_process(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test add secret allowed process."""
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
process_model_info = self.add_test_process(client, with_super_admin_user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
data = {
|
||||
"secret_id": test_secret.id,
|
||||
"allowed_relative_path": process_model_relative_path,
|
||||
}
|
||||
response: TestResponse = client.post(
|
||||
"/v1.0/secrets/allowed_process_paths",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(data),
|
||||
)
|
||||
assert response.status_code == 201
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
|
||||
assert allowed_processes[0].secret_id == test_secret.id
|
||||
|
||||
def test_delete_secret_allowed_process(
|
||||
self,
|
||||
app: Flask,
|
||||
client: FlaskClient,
|
||||
with_db_and_bpmn_file_cleanup: None,
|
||||
with_super_admin_user: UserModel,
|
||||
) -> None:
|
||||
"""Test delete secret allowed process."""
|
||||
test_secret = self.add_test_secret(with_super_admin_user)
|
||||
process_model_info = self.add_test_process(client, with_super_admin_user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
allowed_process = SecretService.add_allowed_process(
|
||||
test_secret.id, with_super_admin_user.id, process_model_relative_path
|
||||
)
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
assert allowed_processes[0].secret_id == test_secret.id
|
||||
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
|
||||
response = client.delete(
|
||||
f"/v1.0/secrets/allowed_process_paths/{allowed_process.id}",
|
||||
headers=self.logged_in_headers(with_super_admin_user),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 0
|
||||
|
|
Loading…
Reference in New Issue